hyperactor/
proc.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! This module provides [`Proc`], which is the runtime used within a single
10//! proc.
11
12// TODO: define a set of proc errors and plumb these throughout
13
14use std::any::Any;
15use std::any::TypeId;
16use std::collections::HashMap;
17use std::fmt;
18use std::future::Future;
19use std::hash::Hash;
20use std::hash::Hasher;
21use std::ops::Deref;
22use std::panic;
23use std::panic::AssertUnwindSafe;
24use std::panic::Location;
25use std::pin::Pin;
26use std::sync::Arc;
27use std::sync::OnceLock;
28use std::sync::Weak;
29use std::sync::atomic::AtomicU64;
30use std::sync::atomic::AtomicUsize;
31use std::sync::atomic::Ordering;
32use std::time::Duration;
33use std::time::SystemTime;
34
35use async_trait::async_trait;
36use dashmap::DashMap;
37use dashmap::mapref::entry::Entry;
38use dashmap::mapref::multiple::RefMulti;
39use futures::FutureExt;
40use hyperactor_config::AttrValue;
41use hyperactor_config::attrs::Attrs;
42use hyperactor_config::attrs::declare_attrs;
43use hyperactor_telemetry::recorder;
44use hyperactor_telemetry::recorder::Recording;
45use serde::Deserialize;
46use serde::Serialize;
47use tokio::sync::mpsc;
48use tokio::sync::watch;
49use tokio::task::JoinHandle;
50use tracing::Instrument;
51use tracing::Level;
52use tracing::Span;
53use typeuri::Named as _;
54use uuid::Uuid;
55use wirevalue::TypeInfo;
56
57use crate as hyperactor;
58use crate::Actor;
59use crate::ActorRef;
60use crate::Handler;
61use crate::Message;
62use crate::RemoteMessage;
63use crate::actor::ActorError;
64use crate::actor::ActorErrorKind;
65use crate::actor::ActorHandle;
66use crate::actor::ActorStatus;
67use crate::actor::Binds;
68use crate::actor::Referable;
69use crate::actor::RemoteHandles;
70use crate::actor::Signal;
71use crate::channel;
72use crate::channel::ChannelAddr;
73use crate::channel::ChannelError;
74use crate::clock::Clock;
75use crate::clock::ClockKind;
76use crate::clock::RealClock;
77use crate::config;
78use crate::context;
79use crate::mailbox::BoxedMailboxSender;
80use crate::mailbox::DeliveryError;
81use crate::mailbox::DialMailboxRouter;
82use crate::mailbox::IntoBoxedMailboxSender as _;
83use crate::mailbox::Mailbox;
84use crate::mailbox::MailboxMuxer;
85use crate::mailbox::MailboxSender;
86use crate::mailbox::MailboxServer as _;
87use crate::mailbox::MessageEnvelope;
88use crate::mailbox::OncePortHandle;
89use crate::mailbox::OncePortReceiver;
90use crate::mailbox::PanickingMailboxSender;
91use crate::mailbox::PortHandle;
92use crate::mailbox::PortReceiver;
93use crate::mailbox::Undeliverable;
94use crate::metrics::ACTOR_MESSAGE_HANDLER_DURATION;
95use crate::metrics::ACTOR_MESSAGE_QUEUE_SIZE;
96use crate::metrics::ACTOR_MESSAGES_RECEIVED;
97use crate::ordering::OrderedSender;
98use crate::ordering::OrderedSenderError;
99use crate::ordering::Sequencer;
100use crate::ordering::ordered_channel;
101use crate::panic_handler;
102use crate::reference::ActorId;
103use crate::reference::Index;
104use crate::reference::PortId;
105use crate::reference::ProcId;
106use crate::reference::id;
107use crate::supervision::ActorSupervisionEvent;
108
109/// This is used to mint new local ranks for [`Proc::local`].
110static NEXT_LOCAL_RANK: AtomicUsize = AtomicUsize::new(0);
111
112/// A proc instance is the runtime managing a single proc in Hyperactor.
113/// It is responsible for spawning actors in the proc, multiplexing messages
114/// to/within actors in the proc, and providing fallback routing to external
115/// procs.
116///
117/// Procs are also responsible for maintaining the local supervision hierarchy.
118#[derive(Clone)]
119pub struct Proc {
120    inner: Arc<ProcState>,
121}
122
123impl fmt::Debug for Proc {
124    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125        f.debug_struct("Proc")
126            .field("proc_id", &self.inner.proc_id)
127            .finish()
128    }
129}
130
131struct ProcState {
132    /// The proc's id. This should be globally unique, but is not (yet)
133    /// for local-only procs.
134    proc_id: ProcId,
135
136    /// A muxer instance that has entries for every actor managed by
137    /// the proc.
138    proc_muxer: MailboxMuxer,
139
140    /// Sender used to forward messages outside of the proc.
141    forwarder: BoxedMailboxSender,
142
143    /// All of the roots (i.e., named actors with pid=0) in the proc.
144    /// These are also known as "global actors", since they may be
145    /// spawned remotely.
146    roots: DashMap<String, AtomicUsize>,
147
148    /// Keep track of all of the active actors in the proc.
149    ledger: ActorLedger,
150
151    instances: DashMap<ActorId, WeakInstanceCell>,
152
153    /// Used by root actors to send events to the actor coordinating
154    /// supervision of root actors in this proc.
155    supervision_coordinator_port: OnceLock<PortHandle<ActorSupervisionEvent>>,
156
157    clock: ClockKind,
158}
159
160impl Drop for ProcState {
161    fn drop(&mut self) {
162        // We only want log ProcStatus::Dropped when ProcState is dropped,
163        // rather than Proc is dropped. This is because we need to wait for
164        // Proc::inner's ref count becomes 0.
165        tracing::info!(
166            proc_id = %self.proc_id,
167            name = "ProcStatus",
168            status = "Dropped"
169        );
170    }
171}
172
173/// A snapshot view of the proc's actor ledger.
174#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
175pub struct ActorLedgerSnapshot {
176    /// All the actor trees in the proc, mapping the root id to the root
177    /// of each tree.
178    pub roots: HashMap<ActorId, ActorTreeSnapshot>,
179}
180
181/// A event for one row of log.
182#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
183pub struct Event {
184    /// Time when the event happend.
185    pub time: SystemTime,
186    /// The payload of the event.
187    pub fields: Vec<(String, recorder::Value)>,
188    /// The sequence number of the event.
189    pub seq: usize,
190}
191
192impl From<recorder::Event> for Event {
193    fn from(event: recorder::Event) -> Event {
194        Event {
195            time: event.time,
196            fields: event.fields(),
197            seq: event.seq,
198        }
199    }
200}
201
202/// A snapshot of an actor tree (rooted at a pid=0 actor).
203#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
204pub struct ActorTreeSnapshot {
205    /// The PID of this actor.
206    pub pid: Index,
207
208    /// The type name of the actor. If the actor is [`typeuri::Named`], then
209    /// this is the registered name; otherwise it is the actor type's
210    /// [`std::any::type_name`].
211    pub type_name: String,
212
213    /// The actor's current status.
214    pub status: ActorStatus,
215
216    /// Various operational stats for the actor.
217    pub stats: ActorStats,
218
219    /// This actor's handlers, mapping port numbers to the named type handled.
220    pub handlers: HashMap<u64, String>,
221
222    /// This actor's children.
223    pub children: HashMap<Index, ActorTreeSnapshot>,
224
225    /// Recent events emitted by the actor's logging.
226    pub events: Vec<Event>,
227
228    /// The current set of spans entered by the actor. These should be active
229    /// only while the actor is entered in a handler.
230    pub spans: Vec<Vec<String>>,
231}
232
233impl Hash for ActorTreeSnapshot {
234    fn hash<H: Hasher>(&self, state: &mut H) {
235        self.pid.hash(state);
236    }
237}
238
239/// Operational stats for an actor instance.
240#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
241#[derive(Default)]
242pub struct ActorStats {
243    /// The number of messages processed by the actor.
244    num_processed_messages: u64,
245}
246
247impl fmt::Display for ActorStats {
248    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
249        write!(f, "num_processed_messages={}", self.num_processed_messages)
250    }
251}
252
253#[derive(Debug)]
254struct ActorLedger {
255    // Root actors. Map's value is its key's InstanceCell.
256    roots: DashMap<ActorId, WeakInstanceCell>,
257}
258
259impl ActorLedger {
260    fn new() -> Self {
261        Self {
262            roots: DashMap::new(),
263        }
264    }
265
266    fn insert(
267        &self,
268        root_actor_id: ActorId,
269        root_actor_cell: WeakInstanceCell,
270    ) -> Result<(), anyhow::Error> {
271        match self.roots.insert(root_actor_id.clone(), root_actor_cell) {
272            None => Ok(()),
273            // This should never happen because we do not recycle root actor's
274            // IDs.
275            Some(current_cell) => {
276                let debugging_msg = match current_cell.upgrade() {
277                    Some(cell) => format!("the stored cell's actor ID is {}", cell.actor_id()),
278                    None => "the stored cell has been dropped".to_string(),
279                };
280
281                Err(anyhow::anyhow!(
282                    "actor '{root_actor_id}' has already been added to ledger: {debugging_msg}"
283                ))
284            }
285        }
286    }
287
288    /// Get a snapshot view of this ledger.
289    fn snapshot(&self) -> ActorLedgerSnapshot {
290        let roots = self
291            .roots
292            .iter()
293            .flat_map(|r| {
294                let (actor_id, weak_cell) = r.pair();
295                // The actor might have been stopped or errored out. Since we do
296                // not remove inactive actors from ledger, the upgrade() call
297                // will return None in that scenario.
298                weak_cell
299                    .upgrade()
300                    .map(|cell| (actor_id.clone(), Self::get_actor_tree_snapshot(&cell)))
301            })
302            .collect();
303
304        ActorLedgerSnapshot { roots }
305    }
306
307    fn get_actor_tree_snapshot(cell: &InstanceCell) -> ActorTreeSnapshot {
308        // Get the edges between this actor and its children.
309        let children = cell
310            .child_iter()
311            .map(|child| (child.pid(), Self::get_actor_tree_snapshot(child.value())))
312            .collect();
313
314        ActorTreeSnapshot {
315            pid: cell.actor_id().pid(),
316            type_name: cell.inner.actor_type.type_name().to_string(),
317            status: cell.status().borrow().clone(),
318            stats: ActorStats {
319                num_processed_messages: cell.inner.num_processed_messages.load(Ordering::SeqCst),
320            },
321            handlers: cell
322                .inner
323                .exported_named_ports
324                .iter()
325                .map(|entry| (*entry.key(), entry.value().to_string()))
326                .collect(),
327            children,
328            events: cell
329                .inner
330                .recording
331                .tail()
332                .into_iter()
333                .map(Event::from)
334                .collect(),
335            spans: cell
336                .inner
337                .recording
338                .stacks()
339                .into_iter()
340                .map(|stack| {
341                    stack
342                        .into_iter()
343                        .map(|meta| meta.name().to_string())
344                        .collect()
345                })
346                .collect(),
347        }
348    }
349}
350
351impl Proc {
352    /// Create a new proc with the given proc id and forwarder.
353    pub fn new(proc_id: ProcId, forwarder: BoxedMailboxSender) -> Self {
354        Self::new_with_clock(proc_id, forwarder, ClockKind::default())
355    }
356
357    /// Create a new direct-addressed proc.
358    pub fn direct(addr: ChannelAddr, name: String) -> Result<Self, ChannelError> {
359        let (addr, rx) = channel::serve(addr)?;
360        let proc_id = ProcId::Direct(addr, name);
361        let proc = Self::new(proc_id, DialMailboxRouter::new().into_boxed());
362        proc.clone().serve(rx);
363        Ok(proc)
364    }
365
366    /// Create a new direct-addressed proc with a default sender for the forwarder.
367    pub fn direct_with_default(
368        addr: ChannelAddr,
369        name: String,
370        default: BoxedMailboxSender,
371    ) -> Result<Self, ChannelError> {
372        let (addr, rx) = channel::serve(addr)?;
373        let proc_id = ProcId::Direct(addr, name);
374        let proc = Self::new(
375            proc_id,
376            DialMailboxRouter::new_with_default(default).into_boxed(),
377        );
378        proc.clone().serve(rx);
379        Ok(proc)
380    }
381
382    /// Create a new proc with the given proc id, forwarder and clock kind.
383    pub fn new_with_clock(
384        proc_id: ProcId,
385        forwarder: BoxedMailboxSender,
386        clock: ClockKind,
387    ) -> Self {
388        tracing::info!(
389            proc_id = %proc_id,
390            name = "ProcStatus",
391            status = "Created"
392        );
393        Self {
394            inner: Arc::new(ProcState {
395                proc_id,
396                proc_muxer: MailboxMuxer::new(),
397                forwarder,
398                roots: DashMap::new(),
399                ledger: ActorLedger::new(),
400                instances: DashMap::new(),
401                supervision_coordinator_port: OnceLock::new(),
402                clock,
403            }),
404        }
405    }
406
407    /// Set the supervision coordinator's port for this proc. Return Err if it is
408    /// already set.
409    pub fn set_supervision_coordinator(
410        &self,
411        port: PortHandle<ActorSupervisionEvent>,
412    ) -> Result<(), anyhow::Error> {
413        self.state()
414            .supervision_coordinator_port
415            .set(port)
416            .map_err(|existing| anyhow::anyhow!("coordinator port is already set to {existing}"))
417    }
418
419    /// Handle a supervision event received by the proc. Attempt to forward it to the
420    /// supervision coordinator port if one is set, otherwise crash the process.
421    pub fn handle_supervision_event(&self, event: ActorSupervisionEvent) {
422        let result = match self.state().supervision_coordinator_port.get() {
423            Some(port) => port.send(event.clone()).map_err(anyhow::Error::from),
424            None => Err(anyhow::anyhow!(
425                "coordinator port is not set for proc {}",
426                self.proc_id(),
427            )),
428        };
429        if let Err(err) = result {
430            tracing::error!(
431                "proc {}: could not propagate supervision event {} due to error: {:?}: crashing",
432                self.proc_id(),
433                event,
434                err
435            );
436
437            std::process::exit(1);
438        }
439    }
440
441    /// Create a new local-only proc. This proc is not allowed to forward messages
442    /// outside of the proc itself.
443    pub fn local() -> Self {
444        // TODO: name these something that is ~ globally unique, e.g., incorporate
445        // the hostname, some GUID, etc.
446        let proc_id = ProcId::Ranked(id!(local), NEXT_LOCAL_RANK.fetch_add(1, Ordering::Relaxed));
447        // TODO: make it so that local procs can talk to each other.
448        Proc::new(proc_id, BoxedMailboxSender::new(PanickingMailboxSender))
449    }
450
451    /// The proc's ID.
452    pub fn proc_id(&self) -> &ProcId {
453        &self.state().proc_id
454    }
455
456    /// Shared sender used by the proc to forward messages to remote
457    /// destinations.
458    pub fn forwarder(&self) -> &BoxedMailboxSender {
459        &self.inner.forwarder
460    }
461
462    /// Convenience accessor for state.
463    fn state(&self) -> &ProcState {
464        self.inner.as_ref()
465    }
466
467    /// The proc's clock.
468    pub fn clock(&self) -> &ClockKind {
469        &self.state().clock
470    }
471
472    /// Get the snapshot of the ledger.
473    pub fn ledger_snapshot(&self) -> ActorLedgerSnapshot {
474        self.state().ledger.snapshot()
475    }
476
477    /// Attach a mailbox to the proc with the provided root name.
478    pub fn attach(&self, name: &str) -> Result<Mailbox, anyhow::Error> {
479        let actor_id: ActorId = self.allocate_root_id(name)?;
480        Ok(self.bind_mailbox(actor_id))
481    }
482
483    /// Attach a mailbox to the proc as a child actor.
484    pub fn attach_child(&self, parent_id: &ActorId) -> Result<Mailbox, anyhow::Error> {
485        let actor_id: ActorId = self.allocate_child_id(parent_id)?;
486        Ok(self.bind_mailbox(actor_id))
487    }
488
489    /// Bind a mailbox to the proc.
490    fn bind_mailbox(&self, actor_id: ActorId) -> Mailbox {
491        let mbox = Mailbox::new(actor_id, BoxedMailboxSender::new(self.downgrade()));
492
493        // TODO: T210748165 tie the muxer entry to the lifecycle of the mailbox held
494        // by the caller. This will likely require a weak reference.
495        self.state().proc_muxer.bind_mailbox(mbox.clone());
496        mbox
497    }
498
499    /// Attach a mailbox to the proc with the provided root name, and bind an [`ActorRef`].
500    /// This is intended only for testing, and will be replaced by simpled utilities.
501    pub fn attach_actor<R, M>(
502        &self,
503        name: &str,
504    ) -> Result<(Instance<()>, ActorRef<R>, PortReceiver<M>), anyhow::Error>
505    where
506        M: RemoteMessage,
507        R: Referable + RemoteHandles<M>,
508    {
509        let (instance, _handle) = self.instance(name)?;
510        let (_handle, rx) = instance.bind_actor_port::<M>();
511        let actor_ref = ActorRef::attest(instance.self_id().clone());
512        Ok((instance, actor_ref, rx))
513    }
514
515    /// Spawn a named (root) actor on this proc. The name of the actor must be
516    /// unique.
517    pub fn spawn<A: Actor>(&self, name: &str, actor: A) -> Result<ActorHandle<A>, anyhow::Error> {
518        let actor_id = self.allocate_root_id(name)?;
519        let span = tracing::span!(
520            Level::INFO,
521            "spawn_actor",
522            actor_name = name,
523            actor_type = std::any::type_name::<A>(),
524            actor_id = actor_id.to_string(),
525        );
526        let (instance, mut actor_loop_receivers, work_rx) = {
527            let _guard = span.clone().entered();
528            Instance::new(self.clone(), actor_id.clone(), false, None)
529        };
530        // Add this actor to the proc's actor ledger. We do not actively remove
531        // inactive actors from ledger, because the actor's state can be inferred
532        // from its weak cell.
533        self.state()
534            .ledger
535            .insert(actor_id.clone(), instance.inner.cell.downgrade())?;
536
537        Ok(instance.start(actor, actor_loop_receivers.take().unwrap(), work_rx))
538    }
539
540    /// Wrapper for [`Proc::actor_instance::<()>`].
541    pub fn instance(&self, name: &str) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
542        let (instance, handle, ..) = self.actor_instance(name)?;
543
544        Ok((instance, handle))
545    }
546
547    /// Create and return an actor instance, its corresponding handle, its signal port receiver,
548    /// its supervision port receiver, and its message receiver. This allows actors to be
549    /// "inverted": the caller can use the returned [`Instance`] to send and receive messages,
550    /// launch child actors, etc. The actor itself does not handle any messages unless driven by
551    /// the caller. Otherwise the instance acts as a normal actor, and can be referenced and
552    /// stopped.
553    pub fn actor_instance<A: Actor>(
554        &self,
555        name: &str,
556    ) -> Result<
557        (
558            Instance<A>,
559            ActorHandle<A>,
560            PortReceiver<ActorSupervisionEvent>,
561            PortReceiver<Signal>,
562            mpsc::UnboundedReceiver<WorkCell<A>>,
563        ),
564        anyhow::Error,
565    > {
566        let actor_id = self.allocate_root_id(name)?;
567        let span = tracing::debug_span!(
568            "actor_instance",
569            actor_name = name,
570            actor_type = std::any::type_name::<A>(),
571            actor_id = actor_id.to_string(),
572        );
573        let _guard = span.enter();
574        let (instance, actor_loop_receivers, work_rx) =
575            Instance::new(self.clone(), actor_id.clone(), false, None);
576        let (signal_rx, supervision_rx) = actor_loop_receivers.unwrap();
577        let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
578        instance.change_status(ActorStatus::Client);
579        Ok((instance, handle, supervision_rx, signal_rx, work_rx))
580    }
581
582    /// Create a child instance. Called from `Instance`.
583    fn child_instance(
584        &self,
585        parent: InstanceCell,
586    ) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
587        let actor_id = self.allocate_child_id(parent.actor_id())?;
588        let _ = tracing::debug_span!(
589            "child_actor_instance",
590            parent_actor_id = %parent.actor_id(),
591            actor_type = std::any::type_name::<()>(),
592            actor_id = %actor_id,
593        );
594
595        let (instance, _, _) = Instance::new(self.clone(), actor_id, false, Some(parent));
596        let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
597        instance.change_status(ActorStatus::Client);
598        Ok((instance, handle))
599    }
600
601    /// Spawn a child actor from the provided parent on this proc. The parent actor
602    /// must already belong to this proc, a fact which is asserted in code.
603    ///
604    /// When spawn_child returns, the child has an associated cell and is linked
605    /// with its parent.
606    fn spawn_child<A: Actor>(
607        &self,
608        parent: InstanceCell,
609        actor: A,
610    ) -> Result<ActorHandle<A>, anyhow::Error> {
611        let actor_id = self.allocate_child_id(parent.actor_id())?;
612        let (instance, mut actor_loop_receivers, work_rx) =
613            Instance::new(self.clone(), actor_id, false, Some(parent.clone()));
614        Ok(instance.start(actor, actor_loop_receivers.take().unwrap(), work_rx))
615    }
616
617    /// Call `abort` on the `JoinHandle` associated with the given
618    /// root actor. If successful return `Some(root.clone())` else
619    /// `None`.
620    pub fn abort_root_actor(
621        &self,
622        root: &ActorId,
623        this_handle: Option<&JoinHandle<()>>,
624    ) -> Option<impl Future<Output = ActorId>> {
625        self.state()
626            .ledger
627            .roots
628            .get(root)
629            .into_iter()
630            .flat_map(|e| e.upgrade())
631            .map(|cell| {
632                let r1 = root.clone();
633                let r2 = root.clone();
634                // If abort_root_actor was called from inside an actor task, we don't want to abort that actor's task yet.
635                let skip_abort = this_handle.is_some_and(|this_h| {
636                    cell.inner
637                        .actor_task_handle
638                        .get()
639                        .is_some_and(|other_h| std::ptr::eq(this_h, other_h))
640                });
641                // `start` was called on the actor's instance
642                // immediately following `root`'s insertion into the
643                // ledger. `Instance::start()` is infallible and should
644                // complete quickly, so calling `wait()` on `actor_task_handle`
645                // should be safe (i.e., not hang forever).
646                async move {
647                    tokio::task::spawn_blocking(move || {
648                        if !skip_abort {
649                            let h = cell.inner.actor_task_handle.wait();
650                            tracing::debug!("{}: aborting {:?}", r1, h);
651                            h.abort();
652                        }
653                    })
654                    .await
655                    .unwrap();
656                    r2
657                }
658            })
659            .next()
660    }
661
662    /// Signals to a root actor to stop,
663    /// returning a status observer if successful.
664    pub fn stop_actor(&self, actor_id: &ActorId) -> Option<watch::Receiver<ActorStatus>> {
665        if let Some(entry) = self.state().ledger.roots.get(actor_id) {
666            match entry.value().upgrade() {
667                None => None, // the root's cell has been dropped
668                Some(cell) => {
669                    tracing::info!("sending stop signal to {}", cell.actor_id());
670                    if let Err(err) = cell.signal(Signal::DrainAndStop) {
671                        tracing::error!(
672                            "{}: failed to send stop signal to pid {}: {:?}",
673                            self.proc_id(),
674                            cell.pid(),
675                            err
676                        );
677                        None
678                    } else {
679                        Some(cell.status().clone())
680                    }
681                }
682            }
683        } else {
684            tracing::error!("no actor {} found in {} roots", actor_id, self.proc_id());
685            None
686        }
687    }
688
689    /// Stop the proc. Returns a pair of:
690    /// - the actors observed to stop;
691    /// - the actors not observed to stop when timeout.
692    ///
693    /// If `cx` is specified, it means this method was called from inside an actor
694    /// in which case we shouldn't wait for it to stop and need to delay aborting
695    /// its task.
696    pub async fn destroy_and_wait<A: Actor>(
697        &mut self,
698        timeout: Duration,
699        cx: Option<&Context<'_, A>>,
700    ) -> Result<(Vec<ActorId>, Vec<ActorId>), anyhow::Error> {
701        self.destroy_and_wait_except_current::<A>(timeout, cx, false)
702            .await
703    }
704
705    /// Stop the proc. Returns a pair of:
706    /// - the actors observed to stop;
707    /// - the actors not observed to stop when timeout.
708    ///
709    /// If `cx` is specified, it means this method was called from inside an actor
710    /// in which case we shouldn't wait for it to stop and need to delay aborting
711    /// its task.
712    /// If except_current is true, don't stop the actor represented by "cx" at
713    /// all.
714    #[hyperactor::instrument]
715    pub async fn destroy_and_wait_except_current<A: Actor>(
716        &mut self,
717        timeout: Duration,
718        cx: Option<&Context<'_, A>>,
719        except_current: bool,
720    ) -> Result<(Vec<ActorId>, Vec<ActorId>), anyhow::Error> {
721        tracing::debug!("{}: proc stopping", self.proc_id());
722
723        let (this_handle, this_actor_id) = cx.map_or((None, None), |cx| {
724            (
725                Some(cx.actor_task_handle().expect("cannot call destroy_and_wait from inside an actor unless actor has finished starting")),
726                Some(cx.self_id())
727            )
728        });
729
730        let mut statuses = HashMap::new();
731        for actor_id in self
732            .state()
733            .ledger
734            .roots
735            .iter()
736            .map(|entry| entry.key().clone())
737            .collect::<Vec<_>>()
738        {
739            if let Some(status) = self.stop_actor(&actor_id) {
740                statuses.insert(actor_id, status);
741            }
742        }
743        tracing::debug!("{}: proc stopped", self.proc_id());
744
745        let waits: Vec<_> = statuses
746            .iter_mut()
747            .filter(|(actor_id, _)| Some(*actor_id) != this_actor_id)
748            .map(|(actor_id, root)| {
749                let actor_id = actor_id.clone();
750                async move {
751                    RealClock
752                        .timeout(
753                            timeout,
754                            root.wait_for(|state: &ActorStatus| state.is_terminal()),
755                        )
756                        .await
757                        .ok()
758                        .map(|_| actor_id)
759                }
760            })
761            .collect();
762
763        let results = futures::future::join_all(waits).await;
764        let stopped_actors: Vec<_> = results
765            .iter()
766            .filter_map(|actor_id| actor_id.as_ref())
767            .cloned()
768            .collect();
769        let aborted_actors: Vec<_> = statuses
770            .iter()
771            .filter(|(actor_id, _)| !stopped_actors.contains(actor_id))
772            .map(|(actor_id, _)| {
773                let f = self.abort_root_actor(actor_id, this_handle);
774                async move {
775                    let _ = if let Some(f) = f { Some(f.await) } else { None };
776                    // If `is_none(&_)` then the proc's `ledger.roots`
777                    // contains an entry that wasn't a root or, the
778                    // associated actor's instance cell was already
779                    // dropped when we went to call `abort()` on the
780                    // cell's task handle.
781
782                    actor_id.clone()
783                }
784            })
785            .collect();
786        let aborted_actors = futures::future::join_all(aborted_actors).await;
787
788        if let Some(this_handle) = this_handle
789            && let Some(this_actor_id) = this_actor_id
790            && !except_current
791        {
792            tracing::debug!("{}: aborting (delayed) {:?}", this_actor_id, this_handle);
793            this_handle.abort()
794        };
795
796        tracing::info!(
797            "destroy_and_wait: {} actors stopped, {} actors aborted",
798            stopped_actors.len(),
799            aborted_actors.len()
800        );
801        Ok((stopped_actors, aborted_actors))
802    }
803
804    /// Resolve an actor reference to an actor residing on this proc.
805    /// Returns None if the actor is not found on this proc.
806    ///
807    /// Bounds:
808    /// - `R: Actor` — must be a real actor that can live in this
809    ///   proc.
810    /// - `R: Referable` — required because the input is an
811    ///   `ActorRef<R>`.
812    pub fn resolve_actor_ref<R: Actor + Referable>(
813        &self,
814        actor_ref: &ActorRef<R>,
815    ) -> Option<ActorHandle<R>> {
816        self.inner
817            .instances
818            .get(actor_ref.actor_id())?
819            .upgrade()?
820            .downcast_handle()
821    }
822
823    /// Create a root allocation in the proc.
824    fn allocate_root_id(&self, name: &str) -> Result<ActorId, anyhow::Error> {
825        let name = name.to_string();
826        match self.state().roots.entry(name.to_string()) {
827            Entry::Vacant(entry) => {
828                entry.insert(AtomicUsize::new(1));
829            }
830            Entry::Occupied(_) => {
831                anyhow::bail!("an actor with name '{}' has already been spawned", name)
832            }
833        }
834        Ok(ActorId(self.state().proc_id.clone(), name.to_string(), 0))
835    }
836
837    /// Create a child allocation in the proc.
838    #[hyperactor::instrument(fields(actor_name=parent_id.name()))]
839    pub(crate) fn allocate_child_id(&self, parent_id: &ActorId) -> Result<ActorId, anyhow::Error> {
840        assert_eq!(*parent_id.proc_id(), self.state().proc_id);
841        let pid = match self.state().roots.get(parent_id.name()) {
842            None => anyhow::bail!(
843                "no actor named {} in proc {}",
844                parent_id.name(),
845                self.state().proc_id
846            ),
847            Some(next_pid) => next_pid.fetch_add(1, Ordering::Relaxed),
848        };
849        Ok(parent_id.child_id(pid))
850    }
851
852    fn downgrade(&self) -> WeakProc {
853        WeakProc::new(self)
854    }
855}
856
857#[async_trait]
858impl MailboxSender for Proc {
859    fn post_unchecked(
860        &self,
861        envelope: MessageEnvelope,
862        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
863    ) {
864        if envelope.dest().actor_id().proc_id() == &self.state().proc_id {
865            self.state().proc_muxer.post(envelope, return_handle)
866        } else {
867            self.state().forwarder.post(envelope, return_handle)
868        }
869    }
870}
871
872#[derive(Debug)]
873struct WeakProc(Weak<ProcState>);
874
875impl WeakProc {
876    fn new(proc: &Proc) -> Self {
877        Self(Arc::downgrade(&proc.inner))
878    }
879
880    fn upgrade(&self) -> Option<Proc> {
881        self.0.upgrade().map(|inner| Proc { inner })
882    }
883}
884
885#[async_trait]
886impl MailboxSender for WeakProc {
887    fn post_unchecked(
888        &self,
889        envelope: MessageEnvelope,
890        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
891    ) {
892        match self.upgrade() {
893            Some(proc) => proc.post(envelope, return_handle),
894            None => envelope.undeliverable(
895                DeliveryError::BrokenLink("fail to upgrade WeakProc".to_string()),
896                return_handle,
897            ),
898        }
899    }
900}
901
902/// Represents a single work item used by the instance to dispatch to
903/// actor handles. Specifically, this enables handler polymorphism.
904pub struct WorkCell<A: Actor + Send>(
905    Box<
906        dyn for<'a> FnOnce(
907                &'a mut A,
908                &'a Instance<A>,
909            )
910                -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
911            + Send
912            + Sync,
913    >,
914);
915
916impl<A: Actor + Send> WorkCell<A> {
917    /// Create a new WorkCell from a concrete function (closure).
918    fn new(
919        f: impl for<'a> FnOnce(
920            &'a mut A,
921            &'a Instance<A>,
922        )
923            -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
924        + Send
925        + Sync
926        + 'static,
927    ) -> Self {
928        Self(Box::new(f))
929    }
930
931    /// Handle the message represented by this work cell.
932    pub fn handle<'a>(
933        self,
934        actor: &'a mut A,
935        instance: &'a Instance<A>,
936    ) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'a>> {
937        (self.0)(actor, instance)
938    }
939}
940
941/// Context for a message currently being handled by an Instance.
942pub struct Context<'a, A: Actor> {
943    instance: &'a Instance<A>,
944    headers: Attrs,
945}
946
947impl<'a, A: Actor> Context<'a, A> {
948    /// Construct a new Context.
949    pub fn new(instance: &'a Instance<A>, headers: Attrs) -> Self {
950        Self { instance, headers }
951    }
952
953    /// Get a reference to the message headers.
954    pub fn headers(&self) -> &Attrs {
955        &self.headers
956    }
957}
958
959impl<A: Actor> Deref for Context<'_, A> {
960    type Target = Instance<A>;
961
962    fn deref(&self) -> &Self::Target {
963        self.instance
964    }
965}
966
967/// An actor instance. This is responsible for managing a running actor, including
968/// its full lifecycle, supervision, signal management, etc. Instances can represent
969/// a managed actor or a "client" actor that has joined the proc.
970pub struct Instance<A: Actor> {
971    inner: Arc<InstanceState<A>>,
972}
973
974impl<A: Actor> fmt::Debug for Instance<A> {
975    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
976        f.debug_struct("Instance").field("inner", &"..").finish()
977    }
978}
979
980struct InstanceState<A: Actor> {
981    /// The proc that owns this instance.
982    proc: Proc,
983
984    /// The instance cell that manages instance hierarchy.
985    cell: InstanceCell,
986
987    /// The mailbox associated with the actor.
988    mailbox: Mailbox,
989
990    ports: Arc<Ports<A>>,
991
992    /// A watch for communicating the actor's state.
993    status_tx: watch::Sender<ActorStatus>,
994
995    /// This instance's globally unique ID.
996    id: Uuid,
997
998    /// Used to assign sequence numbers for messages sent from this actor.
999    sequencer: Sequencer,
1000}
1001
1002impl<A: Actor> InstanceState<A> {
1003    fn self_id(&self) -> &ActorId {
1004        self.mailbox.actor_id()
1005    }
1006}
1007
1008impl<A: Actor> Drop for InstanceState<A> {
1009    fn drop(&mut self) {
1010        self.status_tx.send_if_modified(|status| {
1011            if status.is_terminal() {
1012                false
1013            } else {
1014                tracing::info!(
1015                    name = "ActorStatus",
1016                    actor_id = %self.self_id(),
1017                    actor_name = self.self_id().name(),
1018                    status = "Stopped",
1019                    prev_status = status.arm().unwrap_or("unknown"),
1020                    "instance is dropped",
1021                );
1022                *status = ActorStatus::Stopped;
1023                true
1024            }
1025        });
1026    }
1027}
1028
1029impl<A: Actor> Instance<A> {
1030    /// Create a new actor instance in Created state.
1031    fn new(
1032        proc: Proc,
1033        actor_id: ActorId,
1034        detached: bool,
1035        parent: Option<InstanceCell>,
1036    ) -> (
1037        Self,
1038        Option<(PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>)>,
1039        mpsc::UnboundedReceiver<WorkCell<A>>,
1040    ) {
1041        // Set up messaging
1042        let mailbox = Mailbox::new(actor_id.clone(), BoxedMailboxSender::new(proc.downgrade()));
1043        let (work_tx, work_rx) = ordered_channel(
1044            actor_id.to_string(),
1045            hyperactor_config::global::get(config::ENABLE_CLIENT_SEQ_ASSIGNMENT),
1046        );
1047        let ports: Arc<Ports<A>> = Arc::new(Ports::new(mailbox.clone(), work_tx));
1048        proc.state().proc_muxer.bind_mailbox(mailbox.clone());
1049        let (status_tx, status_rx) = watch::channel(ActorStatus::Created);
1050
1051        let actor_type = match TypeInfo::of::<A>() {
1052            Some(info) => ActorType::Named(info),
1053            None => ActorType::Anonymous(std::any::type_name::<A>()),
1054        };
1055        let actor_loop_ports = if detached {
1056            None
1057        } else {
1058            let (signal_port, signal_receiver) = ports.open_message_port().unwrap();
1059            let (supervision_port, supervision_receiver) = mailbox.open_port();
1060            Some((
1061                (signal_port, supervision_port),
1062                (signal_receiver, supervision_receiver),
1063            ))
1064        };
1065
1066        let (actor_loop, actor_loop_receivers) = actor_loop_ports.unzip();
1067
1068        let cell = InstanceCell::new(
1069            actor_id,
1070            actor_type,
1071            proc.clone(),
1072            actor_loop,
1073            status_rx,
1074            parent,
1075            ports.clone(),
1076        );
1077        let instance_id = Uuid::now_v7();
1078        let inner = Arc::new(InstanceState {
1079            proc,
1080            cell,
1081            mailbox,
1082            ports,
1083            status_tx,
1084            sequencer: Sequencer::new(instance_id),
1085            id: instance_id,
1086        });
1087        (Self { inner }, actor_loop_receivers, work_rx)
1088    }
1089
1090    /// Notify subscribers of a change in the actors status and bump counters with the duration which
1091    /// the last status was active for.
1092    #[track_caller]
1093    fn change_status(&self, new: ActorStatus) {
1094        let old = self.inner.status_tx.send_replace(new.clone());
1095        // 2 cases are allowed:
1096        // * non-terminal -> non-terminal
1097        // * non-terminal -> terminal
1098        // terminal -> terminal is not allowed unless it is the same status (no-op).
1099        // terminal -> non-terminal is never allowed.
1100        assert!(
1101            !old.is_terminal() && !new.is_terminal()
1102                || !old.is_terminal() && new.is_terminal()
1103                || old == new,
1104            "actor changing status illegally, only allow non-terminal -> non-terminal \
1105            and non-terminal -> terminal statuses. actor_id={}, prev_status={}, status={}",
1106            self.self_id(),
1107            old,
1108            new
1109        );
1110        // Actor status changes between Idle and Processing when handling every
1111        // message. It creates too many logs if we want to log these 2 states.
1112        // Also, sometimes the actor transitions from Processing -> Processing.
1113        // Therefore we skip the status changes between them.
1114        if !((old.is_idle() && new.is_processing())
1115            || (old.is_processing() && new.is_idle())
1116            || old == new)
1117        {
1118            let new_status = new.arm().unwrap_or("unknown");
1119            let change_reason = match new {
1120                ActorStatus::Failed(reason) => reason.to_string(),
1121                _ => "".to_string(),
1122            };
1123            tracing::info!(
1124                name = "ActorStatus",
1125                actor_id = %self.self_id(),
1126                actor_name = self.self_id().name(),
1127                status = new_status,
1128                prev_status = old.arm().unwrap_or("unknown"),
1129                caller = %Location::caller(),
1130                change_reason,
1131            );
1132        }
1133    }
1134
1135    fn is_terminal(&self) -> bool {
1136        self.inner.status_tx.borrow().is_terminal()
1137    }
1138
1139    fn is_stopping(&self) -> bool {
1140        self.inner.status_tx.borrow().is_stopping()
1141    }
1142
1143    /// This instance's actor ID.
1144    pub fn self_id(&self) -> &ActorId {
1145        self.inner.self_id()
1146    }
1147
1148    /// Signal the actor to stop.
1149    pub fn stop(&self) -> Result<(), ActorError> {
1150        tracing::info!("Instance::stop called, {}", self.inner.cell.actor_id());
1151        self.inner.cell.signal(Signal::DrainAndStop)
1152    }
1153
1154    /// Open a new port that accepts M-typed messages. The returned
1155    /// port may be freely cloned, serialized, and passed around. The
1156    /// returned receiver should only be retained by the actor responsible
1157    /// for processing the delivered messages.
1158    pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1159        self.inner.mailbox.open_port()
1160    }
1161
1162    /// Open a new one-shot port that accepts M-typed messages. The
1163    /// returned port may be used to send a single message; ditto the
1164    /// receiver may receive a single message.
1165    pub fn open_once_port<M: Message>(&self) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1166        self.inner.mailbox.open_once_port()
1167    }
1168
1169    /// Send a message to the actor running on the proc.
1170    pub fn post(&self, port_id: PortId, headers: Attrs, message: wirevalue::Any) {
1171        <Self as context::MailboxExt>::post(self, port_id, headers, message, true)
1172    }
1173
1174    /// Send a message to the actor itself with a delay usually to trigger some event.
1175    pub fn self_message_with_delay<M>(&self, message: M, delay: Duration) -> Result<(), ActorError>
1176    where
1177        M: Message,
1178        A: Handler<M>,
1179    {
1180        let port = self.port();
1181        let self_id = self.self_id().clone();
1182        let clock = self.inner.proc.state().clock.clone();
1183        tokio::spawn(async move {
1184            clock.non_advancing_sleep(delay).await;
1185            if let Err(e) = port.send(message) {
1186                // TODO: this is a fire-n-forget thread. We need to
1187                // handle errors in a better way.
1188                tracing::info!("{}: error sending delayed message: {}", self_id, e);
1189            }
1190        });
1191        Ok(())
1192    }
1193
1194    /// Start an A-typed actor onto this instance with the provided params. When spawn returns,
1195    /// the actor has been linked with its parent, if it has one.
1196    fn start(
1197        self,
1198        actor: A,
1199        actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1200        work_rx: mpsc::UnboundedReceiver<WorkCell<A>>,
1201    ) -> ActorHandle<A> {
1202        let instance_cell = self.inner.cell.clone();
1203        let actor_id = self.inner.cell.actor_id().clone();
1204        let actor_handle = ActorHandle::new(self.inner.cell.clone(), self.inner.ports.clone());
1205        let actor_task_handle = A::spawn_server_task(
1206            panic_handler::with_backtrace_tracking(self.serve(
1207                actor,
1208                actor_loop_receivers,
1209                work_rx,
1210            ))
1211            .instrument(Span::current()),
1212        );
1213        tracing::debug!("{}: spawned with {:?}", actor_id, actor_task_handle);
1214        instance_cell
1215            .inner
1216            .actor_task_handle
1217            .set(actor_task_handle)
1218            .unwrap_or_else(|_| panic!("{}: task handle store failed", actor_id));
1219
1220        actor_handle
1221    }
1222
1223    async fn serve(
1224        mut self,
1225        mut actor: A,
1226        actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1227        mut work_rx: mpsc::UnboundedReceiver<WorkCell<A>>,
1228    ) {
1229        // `run_actor_tree` borrows `work_rx` instead of taking ownership because
1230        // `work_rx` needs to remain alive until this function returns. If the owning
1231        // proc's `supervision_coordinator_port` is a port on this instance, if `work_rx`
1232        // is dropped before `self.proc.handle_supervision_event` is called, the process
1233        // will exit due to a "channel closed" failure.
1234        let result = self
1235            .run_actor_tree(&mut actor, actor_loop_receivers, &mut work_rx)
1236            .await;
1237
1238        assert!(self.is_stopping());
1239        let event = match result {
1240            Ok(_) => {
1241                // success exit case
1242                self.change_status(ActorStatus::Stopped);
1243                None
1244            }
1245            Err(err) => {
1246                match *err.kind {
1247                    ActorErrorKind::UnhandledSupervisionEvent(box event) => {
1248                        // Currently only terminated actors are allowed to raise supervision events.
1249                        // If we want to change that in the future, we need to modify the exit
1250                        // status here too, because we use event's actor_status as this actor's
1251                        // terminal status.
1252                        assert!(event.actor_status.is_terminal());
1253                        self.change_status(event.actor_status.clone());
1254                        Some(event)
1255                    }
1256                    _ => {
1257                        let error_kind = ActorErrorKind::Generic(err.kind.to_string());
1258                        let status = ActorStatus::Failed(error_kind);
1259                        self.change_status(status.clone());
1260                        Some(ActorSupervisionEvent::new(
1261                            self.inner.cell.actor_id().clone(),
1262                            actor.display_name(),
1263                            status,
1264                            None,
1265                        ))
1266                    }
1267                }
1268            }
1269        };
1270
1271        if let Some(parent) = self.inner.cell.maybe_unlink_parent() {
1272            if let Some(event) = event {
1273                // Parent exists, failure should be propagated to the parent.
1274                parent.send_supervision_event_or_crash(event);
1275            }
1276            // TODO: we should get rid of this signal, and use *only* supervision events for
1277            // the purpose of conveying lifecycle changes
1278            if let Err(err) = parent.signal(Signal::ChildStopped(self.inner.cell.pid())) {
1279                tracing::error!(
1280                    "{}: failed to send stop message to parent pid {}: {:?}",
1281                    self.self_id(),
1282                    parent.pid(),
1283                    err
1284                );
1285            }
1286        } else {
1287            // Failure happened to the root actor or orphaned child actors.
1288            // In either case, the failure should be propagated to proc.
1289            //
1290            // Note that orphaned actor is unexpected and would only happen if
1291            // there is a bug.
1292            if let Some(event) = event {
1293                self.inner.proc.handle_supervision_event(event);
1294            }
1295        }
1296    }
1297
1298    /// Runs the actor, and manages its supervision tree. When the function returns,
1299    /// the whole tree rooted at this actor has stopped.
1300    async fn run_actor_tree(
1301        &mut self,
1302        actor: &mut A,
1303        mut actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1304        work_rx: &mut mpsc::UnboundedReceiver<WorkCell<A>>,
1305    ) -> Result<(), ActorError> {
1306        // It is okay to catch all panics here, because we are in a tokio task,
1307        // and tokio will catch the panic anyway:
1308        // https://docs.rs/tokio/latest/tokio/task/struct.JoinError.html#method.is_panic
1309        // What we do here is just to catch it early so we can handle it.
1310
1311        let mut did_panic = false;
1312        let result = match AssertUnwindSafe(self.run(actor, &mut actor_loop_receivers, work_rx))
1313            .catch_unwind()
1314            .await
1315        {
1316            Ok(result) => result,
1317            Err(_) => {
1318                did_panic = true;
1319                let panic_info = panic_handler::take_panic_info()
1320                    .map(|info| info.to_string())
1321                    .unwrap_or_else(|e| format!("Cannot take backtrace due to: {:?}", e));
1322                Err(ActorError::new(
1323                    self.self_id(),
1324                    ActorErrorKind::panic(anyhow::anyhow!(panic_info)),
1325                ))
1326            }
1327        };
1328
1329        assert!(!self.is_terminal());
1330        self.change_status(ActorStatus::Stopping);
1331        if let Err(err) = &result {
1332            tracing::error!("{}: actor failure: {}", self.self_id(), err);
1333        }
1334
1335        // After this point, we know we won't spawn any more children,
1336        // so we can safely read the current child keys.
1337        let mut to_unlink = Vec::new();
1338        for child in self.inner.cell.child_iter() {
1339            if let Err(err) = child.value().signal(Signal::Stop) {
1340                tracing::error!(
1341                    "{}: failed to send stop signal to child pid {}: {:?}",
1342                    self.self_id(),
1343                    child.key(),
1344                    err
1345                );
1346                to_unlink.push(child.value().clone());
1347            }
1348        }
1349        // Manually unlink children that have already been stopped.
1350        for child in to_unlink {
1351            self.inner.cell.unlink(&child);
1352        }
1353
1354        let (mut signal_receiver, _) = actor_loop_receivers;
1355        while self.inner.cell.child_count() > 0 {
1356            match RealClock
1357                .timeout(Duration::from_millis(500), signal_receiver.recv())
1358                .await
1359            {
1360                Ok(signal) => {
1361                    if let Signal::ChildStopped(pid) = signal? {
1362                        assert!(self.inner.cell.get_child(pid).is_none());
1363                    }
1364                }
1365                Err(_) => {
1366                    tracing::warn!(
1367                        "timeout waiting for ChildStopped signal from child on actor: {}, ignoring",
1368                        self.self_id()
1369                    );
1370                    // No more waiting to receive messages. Unlink all remaining
1371                    // children.
1372                    self.inner.cell.unlink_all();
1373                    break;
1374                }
1375            }
1376        }
1377        // Run the actor cleanup function before the actor stops to delete
1378        // resources. If it times out, continue with stopping the actor.
1379        // Don't call it if there was a panic, because the actor may
1380        // be in an invalid state and unable to access anything, for example
1381        // the GIL.
1382        let cleanup_result = if !did_panic {
1383            let cleanup_timeout = hyperactor_config::global::get(config::CLEANUP_TIMEOUT);
1384            match RealClock
1385                .timeout(cleanup_timeout, actor.cleanup(self, result.as_ref().err()))
1386                .await
1387            {
1388                Ok(Ok(x)) => Ok(x),
1389                Ok(Err(e)) => Err(ActorError::new(self.self_id(), ActorErrorKind::cleanup(e))),
1390                Err(e) => Err(ActorError::new(
1391                    self.self_id(),
1392                    ActorErrorKind::cleanup(e.into()),
1393                )),
1394            }
1395        } else {
1396            Ok(())
1397        };
1398        if let Err(ref actor_err) = result {
1399            // The original result error takes precedence over the cleanup error,
1400            // so make sure the cleanup error is still logged in that case.
1401            if let Err(ref err) = cleanup_result {
1402                tracing::warn!(
1403                    cleanup_err = %err,
1404                    %actor_err,
1405                    "ignoring cleanup error after actor error",
1406                );
1407            }
1408        }
1409        // If the original exit was not an error, let cleanup errors be
1410        // surfaced.
1411        result.and(cleanup_result)
1412    }
1413
1414    /// Initialize and run the actor until it fails or is stopped.
1415    async fn run(
1416        &mut self,
1417        actor: &mut A,
1418        actor_loop_receivers: &mut (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1419        work_rx: &mut mpsc::UnboundedReceiver<WorkCell<A>>,
1420    ) -> Result<(), ActorError> {
1421        let (signal_receiver, supervision_event_receiver) = actor_loop_receivers;
1422
1423        self.change_status(ActorStatus::Initializing);
1424        actor
1425            .init(self)
1426            .await
1427            .map_err(|err| ActorError::new(self.self_id(), ActorErrorKind::init(err)))?;
1428        let need_drain;
1429        'messages: loop {
1430            self.change_status(ActorStatus::Idle);
1431            let metric_pairs =
1432                hyperactor_telemetry::kv_pairs!("actor_id" => self.self_id().to_string());
1433            tokio::select! {
1434                work = work_rx.recv() => {
1435                    ACTOR_MESSAGES_RECEIVED.add(1, metric_pairs);
1436                    ACTOR_MESSAGE_QUEUE_SIZE.add(-1, metric_pairs);
1437                    let _ = ACTOR_MESSAGE_HANDLER_DURATION.start(metric_pairs);
1438                    let work = work.expect("inconsistent work queue state");
1439                    if let Err(err) = work.handle(actor, self).await {
1440                        for supervision_event in supervision_event_receiver.drain() {
1441                            self.handle_supervision_event(actor, supervision_event).await?;
1442                        }
1443                        let kind = ActorErrorKind::processing(err);
1444                        return Err(ActorError {
1445                            actor_id: Box::new(self.self_id().clone()),
1446                            kind: Box::new(kind),
1447                        });
1448                    }
1449                }
1450                signal = signal_receiver.recv() => {
1451                    let signal = signal.map_err(ActorError::from);
1452                    tracing::debug!("Received signal {signal:?}");
1453                    match signal? {
1454                        signal@(Signal::Stop | Signal::DrainAndStop) => {
1455                            need_drain = matches!(signal, Signal::DrainAndStop);
1456                            break 'messages;
1457                        },
1458                        Signal::ChildStopped(pid) => {
1459                            assert!(self.inner.cell.get_child(pid).is_none());
1460                        },
1461                    }
1462                }
1463                Ok(supervision_event) = supervision_event_receiver.recv() => {
1464                    self.handle_supervision_event(actor, supervision_event).await?;
1465                }
1466            }
1467            self.inner
1468                .cell
1469                .inner
1470                .num_processed_messages
1471                .fetch_add(1, Ordering::SeqCst);
1472        }
1473
1474        if need_drain {
1475            let mut n = 0;
1476            while let Ok(work) = work_rx.try_recv() {
1477                if let Err(err) = work.handle(actor, self).await {
1478                    return Err(ActorError::new(
1479                        self.self_id(),
1480                        ActorErrorKind::processing(err),
1481                    ));
1482                }
1483                n += 1;
1484            }
1485            tracing::debug!("drained {} messages", n);
1486        }
1487        tracing::debug!("exited actor loop: {}", self.self_id());
1488        Ok(())
1489    }
1490
1491    /// Handle a supervision event using the provided actor.
1492    pub async fn handle_supervision_event(
1493        &self,
1494        actor: &mut A,
1495        supervision_event: ActorSupervisionEvent,
1496    ) -> Result<(), ActorError> {
1497        // Handle the supervision event with the current actor.
1498        match actor
1499            .handle_supervision_event(self, &supervision_event)
1500            .await
1501        {
1502            Ok(true) => {
1503                // The supervision event was handled by this actor, nothing more to do.
1504                Ok(())
1505            }
1506            Ok(false) => {
1507                let kind = ActorErrorKind::UnhandledSupervisionEvent(Box::new(supervision_event));
1508                Err(ActorError::new(self.self_id(), kind))
1509            }
1510            Err(err) => {
1511                // The actor failed to handle the supervision event, it should die.
1512                // Create a new supervision event for this failure and propagate it.
1513                let kind = ActorErrorKind::ErrorDuringHandlingSupervision(
1514                    err.to_string(),
1515                    Box::new(supervision_event),
1516                );
1517                Err(ActorError::new(self.self_id(), kind))
1518            }
1519        }
1520    }
1521
1522    #[hyperactor::instrument(fields(actor_id = self.self_id().to_string(), actor_name = self.self_id().name()))]
1523    async unsafe fn handle_message<M: Message>(
1524        &self,
1525        actor: &mut A,
1526        type_info: Option<&'static TypeInfo>,
1527        headers: Attrs,
1528        message: M,
1529    ) -> Result<(), anyhow::Error>
1530    where
1531        A: Handler<M>,
1532    {
1533        let handler = type_info.map(|info| {
1534            (
1535                info.typename().to_string(),
1536                // SAFETY: The caller promises to pass the correct type info.
1537                unsafe {
1538                    info.arm_unchecked(&message as *const M as *const ())
1539                        .map(str::to_string)
1540                },
1541            )
1542        });
1543
1544        self.change_status(ActorStatus::Processing(
1545            self.clock().system_time_now(),
1546            handler,
1547        ));
1548        crate::mailbox::headers::log_message_latency_if_sampling(
1549            &headers,
1550            self.self_id().to_string(),
1551        );
1552
1553        let context = Context::new(self, headers);
1554        // Pass a reference to the context to the handler, so that deref
1555        // coercion allows the `this` argument to be treated exactly like
1556        // &Instance<A>.
1557        actor.handle(&context, message).await
1558    }
1559
1560    /// Spawn on child on this instance.
1561    pub fn spawn<C: Actor>(&self, actor: C) -> anyhow::Result<ActorHandle<C>> {
1562        self.inner.proc.spawn_child(self.inner.cell.clone(), actor)
1563    }
1564
1565    /// Create a new direct child instance.
1566    pub fn child(&self) -> anyhow::Result<(Instance<()>, ActorHandle<()>)> {
1567        self.inner.proc.child_instance(self.inner.cell.clone())
1568    }
1569
1570    /// Return a handle port handle representing the actor's message
1571    /// handler for M-typed messages.
1572    pub fn port<M: Message>(&self) -> PortHandle<M>
1573    where
1574        A: Handler<M>,
1575    {
1576        self.inner.ports.get()
1577    }
1578
1579    /// The [`ActorHandle`] corresponding to this instance.
1580    pub fn handle(&self) -> ActorHandle<A> {
1581        ActorHandle::new(self.inner.cell.clone(), Arc::clone(&self.inner.ports))
1582    }
1583
1584    /// The owning actor ref.
1585    pub fn bind<R: Binds<A>>(&self) -> ActorRef<R> {
1586        self.inner.cell.bind(self.inner.ports.as_ref())
1587    }
1588
1589    // Temporary in order to support python bindings.
1590    #[doc(hidden)]
1591    pub fn mailbox_for_py(&self) -> &Mailbox {
1592        &self.inner.mailbox
1593    }
1594
1595    /// A reference to the proc's clock
1596    pub fn clock(&self) -> &(impl Clock + use<A>) {
1597        &self.inner.proc.state().clock
1598    }
1599
1600    /// The owning proc.
1601    pub fn proc(&self) -> &Proc {
1602        &self.inner.proc
1603    }
1604
1605    /// Clone this Instance to get an owned struct that can be
1606    /// plumbed through python. This should really only be called
1607    /// for the explicit purpose of being passed into python
1608    #[doc(hidden)]
1609    pub fn clone_for_py(&self) -> Self {
1610        Self {
1611            inner: Arc::clone(&self.inner),
1612        }
1613    }
1614
1615    /// Get the join handle associated with this actor.
1616    fn actor_task_handle(&self) -> Option<&JoinHandle<()>> {
1617        self.inner.cell.inner.actor_task_handle.get()
1618    }
1619
1620    /// Return this instance's sequencer.
1621    pub fn sequencer(&self) -> &Sequencer {
1622        &self.inner.sequencer
1623    }
1624
1625    /// Return this instance's ID.
1626    pub fn instance_id(&self) -> Uuid {
1627        self.inner.id
1628    }
1629}
1630
1631impl<A: Actor> context::Mailbox for Instance<A> {
1632    fn mailbox(&self) -> &Mailbox {
1633        &self.inner.mailbox
1634    }
1635}
1636
1637impl<A: Actor> context::Mailbox for Context<'_, A> {
1638    fn mailbox(&self) -> &Mailbox {
1639        &self.instance.inner.mailbox
1640    }
1641}
1642
1643impl<A: Actor> context::Mailbox for &Instance<A> {
1644    fn mailbox(&self) -> &Mailbox {
1645        &self.inner.mailbox
1646    }
1647}
1648
1649impl<A: Actor> context::Mailbox for &Context<'_, A> {
1650    fn mailbox(&self) -> &Mailbox {
1651        &self.instance.inner.mailbox
1652    }
1653}
1654
1655impl<A: Actor> context::Actor for Instance<A> {
1656    type A = A;
1657    fn instance(&self) -> &Instance<A> {
1658        self
1659    }
1660}
1661
1662impl<A: Actor> context::Actor for Context<'_, A> {
1663    type A = A;
1664    fn instance(&self) -> &Instance<A> {
1665        self
1666    }
1667}
1668
1669impl<A: Actor> context::Actor for &Instance<A> {
1670    type A = A;
1671    fn instance(&self) -> &Instance<A> {
1672        self
1673    }
1674}
1675
1676impl<A: Actor> context::Actor for &Context<'_, A> {
1677    type A = A;
1678    fn instance(&self) -> &Instance<A> {
1679        self
1680    }
1681}
1682
1683impl Instance<()> {
1684    /// See [Mailbox::bind_actor_port] for details.
1685    pub fn bind_actor_port<M: RemoteMessage>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1686        assert!(
1687            self.actor_task_handle().is_none(),
1688            "can only bind actor port on instance with no running actor task"
1689        );
1690        self.inner.mailbox.bind_actor_port()
1691    }
1692}
1693
1694#[derive(Debug)]
1695enum ActorType {
1696    Named(&'static TypeInfo),
1697    Anonymous(&'static str),
1698}
1699
1700impl ActorType {
1701    /// The actor's type name.
1702    fn type_name(&self) -> &'static str {
1703        match self {
1704            Self::Named(info) => info.typename(),
1705            Self::Anonymous(name) => name,
1706        }
1707    }
1708}
1709
1710/// InstanceCell contains all of the type-erased, shareable state of an instance.
1711/// Specifically, InstanceCells form a supervision tree, and is used by ActorHandle
1712/// to access the underlying instance.
1713///
1714/// InstanceCell is reference counted and cloneable.
1715#[derive(Clone)]
1716pub struct InstanceCell {
1717    inner: Arc<InstanceCellState>,
1718}
1719
1720impl fmt::Debug for InstanceCell {
1721    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1722        f.debug_struct("InstanceCell")
1723            .field("actor_id", &self.inner.actor_id)
1724            .field("actor_type", &self.inner.actor_type)
1725            .finish()
1726    }
1727}
1728
1729struct InstanceCellState {
1730    /// The actor's id.
1731    actor_id: ActorId,
1732
1733    /// Actor info contains the actor's type information.
1734    actor_type: ActorType,
1735
1736    /// The proc in which the actor is running.
1737    proc: Proc,
1738
1739    /// Control port handles to the actor loop, if one is running.
1740    actor_loop: Option<(PortHandle<Signal>, PortHandle<ActorSupervisionEvent>)>,
1741
1742    /// An observer that stores the current status of the actor.
1743    status: watch::Receiver<ActorStatus>,
1744
1745    /// A weak reference to this instance's parent.
1746    parent: WeakInstanceCell,
1747
1748    /// This instance's children by their PIDs.
1749    children: DashMap<Index, InstanceCell>,
1750
1751    /// Access to the spawned actor's join handle.
1752    actor_task_handle: OnceLock<JoinHandle<()>>,
1753
1754    /// The set of named ports that are exported by this actor.
1755    exported_named_ports: DashMap<u64, &'static str>,
1756
1757    /// The number of messages processed by this actor.
1758    num_processed_messages: AtomicU64,
1759
1760    /// The log recording associated with this actor. It is used to
1761    /// store a 'flight record' of events while the actor is running.
1762    recording: Recording,
1763
1764    /// A type-erased reference to Ports<A>, which allows us to recover
1765    /// an ActorHandle<A> by downcasting.
1766    ports: Arc<dyn Any + Send + Sync>,
1767}
1768
1769impl InstanceCellState {
1770    /// Unlink this instance from its parent, if it has one. If it was unlinked,
1771    /// the parent is returned.
1772    fn maybe_unlink_parent(&self) -> Option<InstanceCell> {
1773        self.parent
1774            .upgrade()
1775            .filter(|parent| parent.inner.unlink(self))
1776    }
1777
1778    /// Unlink this instance from a child.
1779    fn unlink(&self, child: &InstanceCellState) -> bool {
1780        assert_eq!(self.actor_id.proc_id(), child.actor_id.proc_id());
1781        self.children.remove(&child.actor_id.pid()).is_some()
1782    }
1783}
1784
1785impl InstanceCell {
1786    /// Creates a new instance cell with the provided internal state. If a parent
1787    /// is provided, it is linked to this cell.
1788    fn new(
1789        actor_id: ActorId,
1790        actor_type: ActorType,
1791        proc: Proc,
1792        actor_loop: Option<(PortHandle<Signal>, PortHandle<ActorSupervisionEvent>)>,
1793        status: watch::Receiver<ActorStatus>,
1794        parent: Option<InstanceCell>,
1795        ports: Arc<dyn Any + Send + Sync>,
1796    ) -> Self {
1797        let _ais = actor_id.to_string();
1798        let cell = Self {
1799            inner: Arc::new(InstanceCellState {
1800                actor_id: actor_id.clone(),
1801                actor_type,
1802                proc: proc.clone(),
1803                actor_loop,
1804                status,
1805                parent: parent.map_or_else(WeakInstanceCell::new, |cell| cell.downgrade()),
1806                children: DashMap::new(),
1807                actor_task_handle: OnceLock::new(),
1808                exported_named_ports: DashMap::new(),
1809                num_processed_messages: AtomicU64::new(0),
1810                recording: hyperactor_telemetry::recorder().record(64),
1811                ports,
1812            }),
1813        };
1814        cell.maybe_link_parent();
1815        proc.inner
1816            .instances
1817            .insert(actor_id.clone(), cell.downgrade());
1818        cell
1819    }
1820
1821    fn wrap(inner: Arc<InstanceCellState>) -> Self {
1822        Self { inner }
1823    }
1824
1825    /// The actor's ID.
1826    pub(crate) fn actor_id(&self) -> &ActorId {
1827        &self.inner.actor_id
1828    }
1829
1830    /// The actor's PID.
1831    pub(crate) fn pid(&self) -> Index {
1832        self.inner.actor_id.pid()
1833    }
1834
1835    /// The actor's join handle.
1836    #[allow(dead_code)]
1837    pub(crate) fn actor_task_handle(&self) -> Option<&JoinHandle<()>> {
1838        self.inner.actor_task_handle.get()
1839    }
1840
1841    /// The instance's status observer.
1842    pub(crate) fn status(&self) -> &watch::Receiver<ActorStatus> {
1843        &self.inner.status
1844    }
1845
1846    /// Send a signal to the actor.
1847    pub fn signal(&self, signal: Signal) -> Result<(), ActorError> {
1848        if let Some((signal_port, _)) = &self.inner.actor_loop {
1849            signal_port.send(signal).map_err(ActorError::from)
1850        } else {
1851            tracing::warn!(
1852                "{}: attempted to send signal {} to detached actor",
1853                self.inner.actor_id,
1854                signal
1855            );
1856            Ok(())
1857        }
1858    }
1859
1860    /// Used by this actor's children to send a supervision event to this actor.
1861    /// When it fails to send, we will crash the process. As part of the crash,
1862    /// all the procs and actors running on this process will be terminated
1863    /// forcefully.
1864    ///
1865    /// Note that "let it crash" is the default behavior when a supervision event
1866    /// cannot be delivered upstream. It is the upstream's responsibility to
1867    /// detect and handle crashes.
1868    pub fn send_supervision_event_or_crash(&self, event: ActorSupervisionEvent) {
1869        match &self.inner.actor_loop {
1870            Some((_, supervision_port)) => {
1871                if let Err(err) = supervision_port.send(event) {
1872                    tracing::error!(
1873                        "{}: failed to send supervision event to actor: {:?}. Crash the process.",
1874                        self.actor_id(),
1875                        err
1876                    );
1877                    std::process::exit(1);
1878                }
1879            }
1880            None => {
1881                tracing::error!(
1882                    "{}: failed: {}: cannot send supervision event to detached actor: crashing",
1883                    self.actor_id(),
1884                    event,
1885                );
1886                std::process::exit(1);
1887            }
1888        }
1889    }
1890
1891    /// Downgrade this InstanceCell to a weak reference.
1892    pub fn downgrade(&self) -> WeakInstanceCell {
1893        WeakInstanceCell {
1894            inner: Arc::downgrade(&self.inner),
1895        }
1896    }
1897
1898    /// Link this instance to a new child.
1899    fn link(&self, child: InstanceCell) {
1900        assert_eq!(self.actor_id().proc_id(), child.actor_id().proc_id());
1901        self.inner.children.insert(child.pid(), child);
1902    }
1903
1904    /// Unlink this instance from a child.
1905    fn unlink(&self, child: &InstanceCell) {
1906        assert_eq!(self.actor_id().proc_id(), child.actor_id().proc_id());
1907        self.inner.children.remove(&child.pid());
1908    }
1909
1910    /// Unlink this instance from all children.
1911    fn unlink_all(&self) {
1912        self.inner.children.clear();
1913    }
1914
1915    /// Link this instance to its parent, if it has one.
1916    fn maybe_link_parent(&self) {
1917        if let Some(parent) = self.inner.parent.upgrade() {
1918            parent.link(self.clone());
1919        }
1920    }
1921
1922    /// Unlink this instance from its parent, if it has one. If it was unlinked,
1923    /// the parent is returned.
1924    fn maybe_unlink_parent(&self) -> Option<InstanceCell> {
1925        self.inner.maybe_unlink_parent()
1926    }
1927
1928    /// Get parent instance cell, if it exists.
1929    #[allow(dead_code)]
1930    fn get_parent_cell(&self) -> Option<InstanceCell> {
1931        self.inner.parent.upgrade()
1932    }
1933
1934    /// Return an iterator over this instance's children. This may deadlock if the
1935    /// caller already holds a reference to any item in map.
1936    fn child_iter(&self) -> impl Iterator<Item = RefMulti<'_, Index, InstanceCell>> {
1937        self.inner.children.iter()
1938    }
1939
1940    /// The number of children this instance has.
1941    fn child_count(&self) -> usize {
1942        self.inner.children.len()
1943    }
1944
1945    /// Get a child by its PID.
1946    fn get_child(&self, pid: Index) -> Option<InstanceCell> {
1947        self.inner.children.get(&pid).map(|child| child.clone())
1948    }
1949
1950    /// This is temporary so that we can share binding code between handle and instance.
1951    /// We should find some (better) way to consolidate the two.
1952    pub(crate) fn bind<A: Actor, R: Binds<A>>(&self, ports: &Ports<A>) -> ActorRef<R> {
1953        <R as Binds<A>>::bind(ports);
1954        // All actors handle signals and undeliverable messages.
1955        ports.bind::<Signal>();
1956        ports.bind::<Undeliverable<MessageEnvelope>>();
1957        // TODO: consider sharing `ports.bound` directly.
1958        for entry in ports.bound.iter() {
1959            self.inner
1960                .exported_named_ports
1961                .insert(*entry.key(), entry.value());
1962        }
1963        ActorRef::attest(self.actor_id().clone())
1964    }
1965
1966    /// Attempt to downcast this cell to a concrete actor handle.
1967    pub(crate) fn downcast_handle<A: Actor>(&self) -> Option<ActorHandle<A>> {
1968        let ports = Arc::clone(&self.inner.ports).downcast::<Ports<A>>().ok()?;
1969        Some(ActorHandle::new(self.clone(), ports))
1970    }
1971}
1972
1973impl Drop for InstanceCellState {
1974    fn drop(&mut self) {
1975        if let Some(parent) = self.maybe_unlink_parent() {
1976            tracing::debug!(
1977                "instance {} was dropped with parent {} still linked",
1978                self.actor_id,
1979                parent.actor_id()
1980            );
1981        }
1982        if self.proc.inner.instances.remove(&self.actor_id).is_none() {
1983            tracing::error!("instance {} was dropped but not in proc", self.actor_id);
1984        }
1985    }
1986}
1987
1988/// A weak version of the InstanceCell. This is used to provide cyclical
1989/// linkage between actors without creating a strong reference cycle.
1990#[derive(Debug, Clone)]
1991pub struct WeakInstanceCell {
1992    inner: Weak<InstanceCellState>,
1993}
1994
1995impl Default for WeakInstanceCell {
1996    fn default() -> Self {
1997        Self::new()
1998    }
1999}
2000
2001impl WeakInstanceCell {
2002    /// Create a new weak instance cell that is never upgradeable.
2003    pub fn new() -> Self {
2004        Self { inner: Weak::new() }
2005    }
2006
2007    /// Upgrade this weak instance cell to a strong reference, if possible.
2008    pub fn upgrade(&self) -> Option<InstanceCell> {
2009        self.inner.upgrade().map(InstanceCell::wrap)
2010    }
2011}
2012
2013/// A polymorphic dictionary that stores ports for an actor's handlers.
2014/// The interface memoizes the ports so that they are reused. We do not
2015/// (yet) support stable identifiers across multiple instances of the same
2016/// actor.
2017pub struct Ports<A: Actor> {
2018    ports: DashMap<TypeId, Box<dyn Any + Send + Sync + 'static>>,
2019    bound: DashMap<u64, &'static str>,
2020    mailbox: Mailbox,
2021    workq: OrderedSender<WorkCell<A>>,
2022}
2023
2024/// A message's sequencer number infomation.
2025#[derive(Serialize, Deserialize, Clone, typeuri::Named, AttrValue)]
2026pub struct SeqInfo {
2027    /// Message's session ID
2028    pub session_id: Uuid,
2029    /// Message's sequence number in the given session.
2030    pub seq: u64,
2031}
2032wirevalue::register_type!(SeqInfo);
2033
2034impl fmt::Display for SeqInfo {
2035    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2036        write!(f, "{}:{}", self.session_id, self.seq)
2037    }
2038}
2039
2040impl std::str::FromStr for SeqInfo {
2041    type Err = anyhow::Error;
2042
2043    fn from_str(s: &str) -> Result<Self, Self::Err> {
2044        let parts: Vec<_> = s.split(':').collect();
2045        if parts.len() != 2 {
2046            return Err(anyhow::anyhow!("invalid SeqInfo: {}", s));
2047        }
2048        let session_id: Uuid = parts[0].parse()?;
2049        let seq: u64 = parts[1].parse()?;
2050        Ok(SeqInfo { session_id, seq })
2051    }
2052}
2053
2054declare_attrs! {
2055    /// The sender of this message, the session ID, and the message's sequence
2056    /// number assigned by this session.
2057    pub attr SEQ_INFO: SeqInfo;
2058}
2059
2060impl<A: Actor> Ports<A> {
2061    fn new(mailbox: Mailbox, workq: OrderedSender<WorkCell<A>>) -> Self {
2062        Self {
2063            ports: DashMap::new(),
2064            bound: DashMap::new(),
2065            mailbox,
2066            workq,
2067        }
2068    }
2069
2070    /// Get a port for the Handler<M> of actor A.
2071    pub(crate) fn get<M: Message>(&self) -> PortHandle<M>
2072    where
2073        A: Handler<M>,
2074    {
2075        let key = TypeId::of::<M>();
2076        match self.ports.entry(key) {
2077            Entry::Vacant(entry) => {
2078                // Some special case hackery, but it keeps the rest of the code (relatively) simple.
2079                assert_ne!(
2080                    key,
2081                    TypeId::of::<Signal>(),
2082                    "cannot provision Signal port through `Ports::get`"
2083                );
2084
2085                let type_info = TypeInfo::get_by_typeid(key);
2086                let workq = self.workq.clone();
2087                let actor_id = self.mailbox.actor_id().to_string();
2088                let port = self.mailbox.open_enqueue_port(move |headers, msg: M| {
2089                    let seq_info = headers.get(SEQ_INFO).cloned();
2090
2091                    let work = WorkCell::new(move |actor: &mut A, instance: &Instance<A>| {
2092                        Box::pin(async move {
2093                            // SAFETY: we guarantee that the passed type_info is for type M.
2094                            unsafe {
2095                                instance
2096                                    .handle_message(actor, type_info, headers, msg)
2097                                    .await
2098                            }
2099                        })
2100                    });
2101                    ACTOR_MESSAGE_QUEUE_SIZE.add(
2102                        1,
2103                        hyperactor_telemetry::kv_pairs!("actor_id" => actor_id.clone()),
2104                    );
2105                    if workq.enable_buffering {
2106                        let SeqInfo { session_id, seq } =
2107                            seq_info.expect("SEQ_INFO must be set when buffering is enabled");
2108
2109                        // TODO: return the message contained in the error instead of dropping them when converting
2110                        // to anyhow::Error. In that way, the message can be picked up by mailbox and returned to sender.
2111                        workq.send(session_id, seq, work).map_err(|e| match e {
2112                            OrderedSenderError::InvalidZeroSeq(_) => {
2113                                anyhow::anyhow!("seq must be greater than 0")
2114                            }
2115                            OrderedSenderError::SendError(e) => anyhow::Error::from(e),
2116                            OrderedSenderError::FlushError(e) => e,
2117                        })
2118                    } else {
2119                        workq.direct_send(work).map_err(anyhow::Error::from)
2120                    }
2121                });
2122                entry.insert(Box::new(port.clone()));
2123                port
2124            }
2125            Entry::Occupied(entry) => {
2126                let port = entry.get();
2127                port.downcast_ref::<PortHandle<M>>().unwrap().clone()
2128            }
2129        }
2130    }
2131
2132    /// Open a (typed) message port as in [`get`], but return a port receiver instead of dispatching
2133    /// the underlying handler.
2134    pub(crate) fn open_message_port<M: Message>(&self) -> Option<(PortHandle<M>, PortReceiver<M>)> {
2135        match self.ports.entry(TypeId::of::<M>()) {
2136            Entry::Vacant(entry) => {
2137                let (port, receiver) = self.mailbox.open_port();
2138                entry.insert(Box::new(port.clone()));
2139                Some((port, receiver))
2140            }
2141            Entry::Occupied(_) => None,
2142        }
2143    }
2144
2145    /// Bind the given message type to its actor port.
2146    pub fn bind<M: RemoteMessage>(&self)
2147    where
2148        A: Handler<M>,
2149    {
2150        let port_index = M::port();
2151        match self.bound.entry(port_index) {
2152            Entry::Vacant(entry) => {
2153                self.get::<M>().bind_actor_port();
2154                entry.insert(M::typename());
2155            }
2156            Entry::Occupied(entry) => {
2157                assert_eq!(
2158                    *entry.get(),
2159                    M::typename(),
2160                    "bind {}: port index {} already bound to type {}",
2161                    M::typename(),
2162                    port_index,
2163                    entry.get(),
2164                );
2165            }
2166        }
2167    }
2168}
2169
2170#[cfg(test)]
2171mod tests {
2172    use std::assert_matches::assert_matches;
2173    use std::sync::atomic::AtomicBool;
2174
2175    use hyperactor_macros::export;
2176    use maplit::hashmap;
2177    use serde_json::json;
2178    use timed_test::async_timed_test;
2179    use tokio::sync::Barrier;
2180    use tokio::sync::oneshot;
2181    use tracing::Level;
2182    use tracing_subscriber::layer::SubscriberExt;
2183    use tracing_test::internal::logs_with_scope_contain;
2184
2185    use super::*;
2186    // needed for in-crate macro expansion
2187    use crate as hyperactor;
2188    use crate::HandleClient;
2189    use crate::Handler;
2190    use crate::OncePortRef;
2191    use crate::PortRef;
2192    use crate::clock::RealClock;
2193    use crate::test_utils::proc_supervison::ProcSupervisionCoordinator;
2194    use crate::test_utils::process_assertion::assert_termination;
2195
2196    impl ActorTreeSnapshot {
2197        #[allow(dead_code)]
2198        fn empty(pid: Index) -> Self {
2199            Self {
2200                pid,
2201                type_name: String::new(),
2202                status: ActorStatus::Idle,
2203                stats: ActorStats::default(),
2204                handlers: HashMap::new(),
2205                children: HashMap::new(),
2206                events: Vec::new(),
2207                spans: Vec::new(),
2208            }
2209        }
2210
2211        fn empty_typed(pid: Index, type_name: String) -> Self {
2212            Self {
2213                pid,
2214                type_name,
2215                status: ActorStatus::Idle,
2216                stats: ActorStats::default(),
2217                handlers: HashMap::new(),
2218                children: HashMap::new(),
2219                events: Vec::new(),
2220                spans: Vec::new(),
2221            }
2222        }
2223    }
2224
2225    #[derive(Debug, Default)]
2226    #[export]
2227    struct TestActor;
2228
2229    impl Actor for TestActor {}
2230
2231    #[derive(Handler, HandleClient, Debug)]
2232    enum TestActorMessage {
2233        Reply(oneshot::Sender<()>),
2234        Wait(oneshot::Sender<()>, oneshot::Receiver<()>),
2235        Forward(ActorHandle<TestActor>, Box<TestActorMessage>),
2236        Noop(),
2237        Fail(anyhow::Error),
2238        Panic(String),
2239        Spawn(oneshot::Sender<ActorHandle<TestActor>>),
2240    }
2241
2242    impl TestActor {
2243        async fn spawn_child(
2244            cx: &impl context::Actor,
2245            parent: &ActorHandle<TestActor>,
2246        ) -> ActorHandle<TestActor> {
2247            let (tx, rx) = oneshot::channel();
2248            parent.send(cx, TestActorMessage::Spawn(tx)).unwrap();
2249            rx.await.unwrap()
2250        }
2251    }
2252
2253    #[async_trait]
2254    #[crate::forward(TestActorMessage)]
2255    impl TestActorMessageHandler for TestActor {
2256        async fn reply(
2257            &mut self,
2258            _cx: &crate::Context<Self>,
2259            sender: oneshot::Sender<()>,
2260        ) -> Result<(), anyhow::Error> {
2261            sender.send(()).unwrap();
2262            Ok(())
2263        }
2264
2265        async fn wait(
2266            &mut self,
2267            _cx: &crate::Context<Self>,
2268            sender: oneshot::Sender<()>,
2269            receiver: oneshot::Receiver<()>,
2270        ) -> Result<(), anyhow::Error> {
2271            sender.send(()).unwrap();
2272            receiver.await.unwrap();
2273            Ok(())
2274        }
2275
2276        async fn forward(
2277            &mut self,
2278            cx: &crate::Context<Self>,
2279            destination: ActorHandle<TestActor>,
2280            message: Box<TestActorMessage>,
2281        ) -> Result<(), anyhow::Error> {
2282            // TODO: this needn't be async
2283            destination.send(cx, *message)?;
2284            Ok(())
2285        }
2286
2287        async fn noop(&mut self, _cx: &crate::Context<Self>) -> Result<(), anyhow::Error> {
2288            Ok(())
2289        }
2290
2291        async fn fail(
2292            &mut self,
2293            _cx: &crate::Context<Self>,
2294            err: anyhow::Error,
2295        ) -> Result<(), anyhow::Error> {
2296            Err(err)
2297        }
2298
2299        async fn panic(
2300            &mut self,
2301            _cx: &crate::Context<Self>,
2302            err_msg: String,
2303        ) -> Result<(), anyhow::Error> {
2304            panic!("{}", err_msg);
2305        }
2306
2307        async fn spawn(
2308            &mut self,
2309            cx: &crate::Context<Self>,
2310            reply: oneshot::Sender<ActorHandle<TestActor>>,
2311        ) -> Result<(), anyhow::Error> {
2312            let handle = TestActor.spawn(cx)?;
2313            reply.send(handle).unwrap();
2314            Ok(())
2315        }
2316    }
2317
2318    #[tracing_test::traced_test]
2319    #[async_timed_test(timeout_secs = 30)]
2320    async fn test_spawn_actor() {
2321        let proc = Proc::local();
2322        let (client, _) = proc.instance("client").unwrap();
2323        let handle = proc.spawn("test", TestActor).unwrap();
2324
2325        // Check on the join handle.
2326        assert!(logs_contain(
2327            format!(
2328                "{}: spawned with {:?}",
2329                handle.actor_id(),
2330                handle.cell().actor_task_handle().unwrap(),
2331            )
2332            .as_str()
2333        ));
2334
2335        let mut state = handle.status().clone();
2336
2337        // Send a ping-pong to the actor. Wait for the actor to become idle.
2338
2339        let (tx, rx) = oneshot::channel::<()>();
2340        handle.send(&client, TestActorMessage::Reply(tx)).unwrap();
2341        rx.await.unwrap();
2342
2343        state
2344            .wait_for(|state: &ActorStatus| matches!(*state, ActorStatus::Idle))
2345            .await
2346            .unwrap();
2347
2348        // Make sure we enter processing state while the actor is handling a message.
2349        let (enter_tx, enter_rx) = oneshot::channel::<()>();
2350        let (exit_tx, exit_rx) = oneshot::channel::<()>();
2351
2352        handle
2353            .send(&client, TestActorMessage::Wait(enter_tx, exit_rx))
2354            .unwrap();
2355        enter_rx.await.unwrap();
2356        assert_matches!(*state.borrow(), ActorStatus::Processing(instant, _) if instant <= RealClock.system_time_now());
2357        exit_tx.send(()).unwrap();
2358
2359        state
2360            .wait_for(|state| matches!(*state, ActorStatus::Idle))
2361            .await
2362            .unwrap();
2363
2364        handle.drain_and_stop().unwrap();
2365        handle.await;
2366        assert_matches!(*state.borrow(), ActorStatus::Stopped);
2367    }
2368
2369    #[async_timed_test(timeout_secs = 30)]
2370    async fn test_proc_actors_messaging() {
2371        let proc = Proc::local();
2372        let (client, _) = proc.instance("client").unwrap();
2373        let first = proc.spawn::<TestActor>("first", TestActor).unwrap();
2374        let second = proc.spawn::<TestActor>("second", TestActor).unwrap();
2375        let (tx, rx) = oneshot::channel::<()>();
2376        let reply_message = TestActorMessage::Reply(tx);
2377        first
2378            .send(
2379                &client,
2380                TestActorMessage::Forward(second, Box::new(reply_message)),
2381            )
2382            .unwrap();
2383        rx.await.unwrap();
2384    }
2385
2386    #[derive(Debug, Default)]
2387    #[export]
2388    struct LookupTestActor;
2389
2390    impl Actor for LookupTestActor {}
2391
2392    #[derive(Handler, HandleClient, Debug)]
2393    enum LookupTestMessage {
2394        ActorExists(ActorRef<TestActor>, #[reply] OncePortRef<bool>),
2395    }
2396
2397    #[async_trait]
2398    #[crate::forward(LookupTestMessage)]
2399    impl LookupTestMessageHandler for LookupTestActor {
2400        async fn actor_exists(
2401            &mut self,
2402            cx: &crate::Context<Self>,
2403            actor_ref: ActorRef<TestActor>,
2404        ) -> Result<bool, anyhow::Error> {
2405            Ok(actor_ref.downcast_handle(cx).is_some())
2406        }
2407    }
2408
2409    #[async_timed_test(timeout_secs = 30)]
2410    async fn test_actor_lookup() {
2411        let proc = Proc::local();
2412        let (client, _handle) = proc.instance("client").unwrap();
2413
2414        let target_actor = proc.spawn::<TestActor>("target", TestActor).unwrap();
2415        let target_actor_ref = target_actor.bind();
2416        let lookup_actor = proc
2417            .spawn::<LookupTestActor>("lookup", LookupTestActor)
2418            .unwrap();
2419
2420        assert!(
2421            lookup_actor
2422                .actor_exists(&client, target_actor_ref.clone())
2423                .await
2424                .unwrap()
2425        );
2426
2427        // Make up a child actor. It shouldn't exist.
2428        assert!(
2429            !lookup_actor
2430                .actor_exists(
2431                    &client,
2432                    ActorRef::attest(target_actor.actor_id().child_id(123).clone())
2433                )
2434                .await
2435                .unwrap()
2436        );
2437        // A wrongly-typed actor ref should also not obtain.
2438        assert!(
2439            !lookup_actor
2440                .actor_exists(&client, ActorRef::attest(lookup_actor.actor_id().clone()))
2441                .await
2442                .unwrap()
2443        );
2444
2445        target_actor.drain_and_stop().unwrap();
2446        target_actor.await;
2447
2448        assert!(
2449            !lookup_actor
2450                .actor_exists(&client, target_actor_ref)
2451                .await
2452                .unwrap()
2453        );
2454
2455        lookup_actor.drain_and_stop().unwrap();
2456        lookup_actor.await;
2457    }
2458
2459    fn validate_link(child: &InstanceCell, parent: &InstanceCell) {
2460        assert_eq!(child.actor_id().proc_id(), parent.actor_id().proc_id());
2461        assert_eq!(
2462            child.inner.parent.upgrade().unwrap().actor_id(),
2463            parent.actor_id()
2464        );
2465        assert_matches!(
2466            parent.inner.children.get(&child.pid()),
2467            Some(node) if node.actor_id() == child.actor_id()
2468        );
2469    }
2470
2471    #[tracing_test::traced_test]
2472    #[async_timed_test(timeout_secs = 30)]
2473    async fn test_spawn_child() {
2474        let proc = Proc::local();
2475        let (client, _) = proc.instance("client").unwrap();
2476
2477        let first = proc.spawn::<TestActor>("first", TestActor).unwrap();
2478        let second = TestActor::spawn_child(&client, &first).await;
2479        let third = TestActor::spawn_child(&client, &second).await;
2480
2481        // Check we've got the join handles.
2482        assert!(logs_with_scope_contain(
2483            "hyperactor::proc",
2484            format!(
2485                "{}: spawned with {:?}",
2486                first.actor_id(),
2487                first.cell().actor_task_handle().unwrap()
2488            )
2489            .as_str()
2490        ));
2491        assert!(logs_with_scope_contain(
2492            "hyperactor::proc",
2493            format!(
2494                "{}: spawned with {:?}",
2495                second.actor_id(),
2496                second.cell().actor_task_handle().unwrap()
2497            )
2498            .as_str()
2499        ));
2500        assert!(logs_with_scope_contain(
2501            "hyperactor::proc",
2502            format!(
2503                "{}: spawned with {:?}",
2504                third.actor_id(),
2505                third.cell().actor_task_handle().unwrap()
2506            )
2507            .as_str()
2508        ));
2509
2510        // These are allocated in sequence:
2511        assert_eq!(first.actor_id().proc_id(), proc.proc_id());
2512        assert_eq!(second.actor_id(), &first.actor_id().child_id(1));
2513        assert_eq!(third.actor_id(), &first.actor_id().child_id(2));
2514
2515        // Supervision tree is constructed correctly.
2516        validate_link(third.cell(), second.cell());
2517        validate_link(second.cell(), first.cell());
2518        assert!(first.cell().inner.parent.upgrade().is_none());
2519
2520        // Supervision tree is torn down correctly.
2521        // Once each actor is stopped, it should have no linked children.
2522        let third_cell = third.cell().clone();
2523        third.drain_and_stop().unwrap();
2524        third.await;
2525        assert!(third_cell.inner.children.is_empty());
2526        drop(third_cell);
2527        validate_link(second.cell(), first.cell());
2528
2529        let second_cell = second.cell().clone();
2530        second.drain_and_stop().unwrap();
2531        second.await;
2532        assert!(second_cell.inner.children.is_empty());
2533        drop(second_cell);
2534
2535        let first_cell = first.cell().clone();
2536        first.drain_and_stop().unwrap();
2537        first.await;
2538        assert!(first_cell.inner.children.is_empty());
2539    }
2540
2541    #[async_timed_test(timeout_secs = 30)]
2542    async fn test_child_lifecycle() {
2543        let proc = Proc::local();
2544        let (client, _) = proc.instance("client").unwrap();
2545
2546        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
2547        let root_1 = TestActor::spawn_child(&client, &root).await;
2548        let root_2 = TestActor::spawn_child(&client, &root).await;
2549        let root_2_1 = TestActor::spawn_child(&client, &root_2).await;
2550
2551        root.drain_and_stop().unwrap();
2552        root.await;
2553
2554        for actor in [root_1, root_2, root_2_1] {
2555            assert!(actor.send(&client, TestActorMessage::Noop()).is_err());
2556            assert_matches!(actor.await, ActorStatus::Stopped);
2557        }
2558    }
2559
2560    #[async_timed_test(timeout_secs = 30)]
2561    async fn test_parent_failure() {
2562        let proc = Proc::local();
2563        let (client, _) = proc.instance("client").unwrap();
2564        // Need to set a supervison coordinator for this Proc because there will
2565        // be actor failure(s) in this test which trigger supervision.
2566        ProcSupervisionCoordinator::set(&proc).await.unwrap();
2567
2568        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
2569        let root_1 = TestActor::spawn_child(&client, &root).await;
2570        let root_2 = TestActor::spawn_child(&client, &root).await;
2571        let root_2_1 = TestActor::spawn_child(&client, &root_2).await;
2572
2573        root_2
2574            .send(
2575                &client,
2576                TestActorMessage::Fail(anyhow::anyhow!("some random failure")),
2577            )
2578            .unwrap();
2579        let _root_2_actor_id = root_2.actor_id().clone();
2580        assert_matches!(
2581            root_2.await,
2582            ActorStatus::Failed(err) if err.to_string() == "some random failure"
2583        );
2584
2585        // TODO: should we provide finer-grained stop reasons, e.g., to indicate it was
2586        // stopped by a parent failure?
2587        // Currently the parent fails with an error related to the child's failure.
2588        assert_matches!(
2589            root.await,
2590            ActorStatus::Failed(err) if err.to_string().contains("some random failure")
2591        );
2592        assert_eq!(root_2_1.await, ActorStatus::Stopped);
2593        assert_eq!(root_1.await, ActorStatus::Stopped);
2594    }
2595
2596    #[async_timed_test(timeout_secs = 30)]
2597    // TODO: The snapshot has a flaky num_messages_processed count after stopping
2598    // root_1, but only on Github. The test expects 3, sometimes the result is 2.
2599    #[cfg_attr(not(fbcode_build), ignore)]
2600    async fn test_actor_ledger() {
2601        async fn wait_until_idle(actor_handle: &ActorHandle<TestActor>) {
2602            actor_handle
2603                .status()
2604                .wait_for(|state: &ActorStatus| matches!(*state, ActorStatus::Idle))
2605                .await
2606                .unwrap();
2607        }
2608
2609        let proc = Proc::local();
2610        let (client, _) = proc.instance("client").unwrap();
2611
2612        // Add the 1st root. This root will remain active until the end of the test.
2613        let root: ActorHandle<TestActor> = proc.spawn("root", TestActor).unwrap();
2614        wait_until_idle(&root).await;
2615        {
2616            let snapshot = proc.state().ledger.snapshot();
2617            assert_eq!(
2618                snapshot.roots,
2619                hashmap! {
2620                    root.actor_id().clone() =>
2621                        ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string())
2622                },
2623            );
2624        }
2625
2626        // Add the 2nd root.
2627        let another_root: ActorHandle<TestActor> = proc.spawn("another_root", TestActor).unwrap();
2628        wait_until_idle(&another_root).await;
2629        {
2630            let snapshot = proc.state().ledger.snapshot();
2631            assert_eq!(
2632                snapshot.roots,
2633                hashmap! {
2634                    root.actor_id().clone() =>
2635                        ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string()),
2636                    another_root.actor_id().clone() =>
2637                        ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string()),
2638                },
2639            );
2640        }
2641
2642        // Stop the 2nd root. It should be excluded from the snapshot after it
2643        // is stopped.
2644        another_root.drain_and_stop().unwrap();
2645        another_root.await;
2646        {
2647            let snapshot = proc.state().ledger.snapshot();
2648            assert_eq!(
2649                snapshot.roots,
2650                hashmap! { root.actor_id().clone() =>
2651                    ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string())
2652                },
2653            );
2654        }
2655
2656        // Incrementally add the following children tree to root. This tree
2657        // should be captured by snapshot.
2658        //     root -> root_1 -> root_1_1
2659        //         |-> root_2
2660
2661        let root_1 = TestActor::spawn_child(&client, &root).await;
2662        wait_until_idle(&root_1).await;
2663        // Wait until the root actor processes the message and is then idle again.
2664        wait_until_idle(&root).await;
2665        {
2666            let snapshot = proc.state().ledger.snapshot();
2667            assert_eq!(
2668                snapshot.roots,
2669                hashmap! {
2670                    root.actor_id().clone() =>  ActorTreeSnapshot {
2671                        pid: 0,
2672                        type_name: "hyperactor::proc::tests::TestActor".to_string(),
2673                        status: ActorStatus::Idle,
2674                        stats: ActorStats { num_processed_messages: 1 },
2675                        handlers: HashMap::new(),
2676                        children: hashmap! {
2677                            root_1.actor_id().pid() =>
2678                                ActorTreeSnapshot::empty_typed(
2679                                    root_1.actor_id().pid(),
2680                                    "hyperactor::proc::tests::TestActor".to_string()
2681                                )
2682                        },
2683                        events: Vec::new(),
2684                        spans: Vec::new(),
2685                    }
2686                },
2687            );
2688        }
2689
2690        let root_1_1 = TestActor::spawn_child(&client, &root_1).await;
2691        wait_until_idle(&root_1_1).await;
2692        wait_until_idle(&root_1).await;
2693        {
2694            let snapshot = proc.state().ledger.snapshot();
2695            assert_eq!(
2696                snapshot.roots,
2697                hashmap! {
2698                    root.actor_id().clone() =>  ActorTreeSnapshot {
2699                        pid: 0,
2700                        type_name: "hyperactor::proc::tests::TestActor".to_string(),
2701                        status: ActorStatus::Idle,
2702                        stats: ActorStats { num_processed_messages: 1 },
2703                        handlers: HashMap::new(),
2704                        children: hashmap!{
2705                            root_1.actor_id().pid() =>
2706                                ActorTreeSnapshot {
2707                                    pid: root_1.actor_id().pid(),
2708                                    type_name: "hyperactor::proc::tests::TestActor".to_string(),
2709                                    status: ActorStatus::Idle,
2710                                    stats: ActorStats { num_processed_messages: 1 },
2711                                    handlers: HashMap::new(),
2712                                    children: hashmap!{
2713                                        root_1_1.actor_id().pid() =>
2714                                            ActorTreeSnapshot::empty_typed(
2715                                                root_1_1.actor_id().pid(),
2716                                                "hyperactor::proc::tests::TestActor".to_string()
2717                                            )
2718                                    },
2719                                    events: Vec::new(),
2720                                    spans: Vec::new(),
2721                                }
2722                        },
2723                        events: Vec::new(),
2724                        spans: Vec::new(),
2725                    },
2726                }
2727            );
2728        }
2729
2730        let root_2 = TestActor::spawn_child(&client, &root).await;
2731        wait_until_idle(&root_2).await;
2732        wait_until_idle(&root).await;
2733        {
2734            let snapshot = proc.state().ledger.snapshot();
2735            assert_eq!(
2736                snapshot.roots,
2737                hashmap! {
2738                    root.actor_id().clone() =>  ActorTreeSnapshot {
2739                        pid: 0,
2740                        type_name: "hyperactor::proc::tests::TestActor".to_string(),
2741                        status: ActorStatus::Idle,
2742                        stats: ActorStats { num_processed_messages: 2 },
2743                        handlers: HashMap::new(),
2744                        children: hashmap!{
2745                            root_2.actor_id().pid() =>
2746                                ActorTreeSnapshot{
2747                                    pid: root_2.actor_id().pid(),
2748                                    type_name: "hyperactor::proc::tests::TestActor".to_string(),
2749                                    status: ActorStatus::Idle,
2750                                    stats: ActorStats::default(),
2751                                    handlers: HashMap::new(),
2752                                    children: HashMap::new(),
2753                                    events: Vec::new(),
2754                                    spans: Vec::new(),
2755                                },
2756                            root_1.actor_id().pid() =>
2757                                ActorTreeSnapshot{
2758                                    pid: root_1.actor_id().pid(),
2759                                    type_name: "hyperactor::proc::tests::TestActor".to_string(),
2760                                    status: ActorStatus::Idle,
2761                                    stats: ActorStats { num_processed_messages: 1 },
2762                                    handlers: HashMap::new(),
2763                                    children: hashmap!{
2764                                        root_1_1.actor_id().pid() =>
2765                                            ActorTreeSnapshot::empty_typed(
2766                                                root_1_1.actor_id().pid(),
2767                                                "hyperactor::proc::tests::TestActor".to_string()
2768                                            )
2769                                    },
2770                                    events: Vec::new(),
2771                                    spans: Vec::new(),
2772                                },
2773                        },
2774                        events: Vec::new(),
2775                        spans: Vec::new(),
2776                    },
2777                }
2778            );
2779        }
2780
2781        // Stop root_1. This should remove it, and its child, from snapshot.
2782        root_1.drain_and_stop().unwrap();
2783        root_1.await;
2784        // root also needs to stop processing messages to get a reliable number.
2785        wait_until_idle(&root).await;
2786        {
2787            let snapshot = proc.state().ledger.snapshot();
2788            assert_eq!(
2789                snapshot.roots,
2790                hashmap! {
2791                    root.actor_id().clone() =>  ActorTreeSnapshot {
2792                        pid: 0,
2793                        type_name: "hyperactor::proc::tests::TestActor".to_string(),
2794                        status: ActorStatus::Idle,
2795                        stats: ActorStats { num_processed_messages: 3 },
2796                        handlers: HashMap::new(),
2797                        children: hashmap!{
2798                            root_2.actor_id().pid() =>
2799                                ActorTreeSnapshot {
2800                                    pid: root_2.actor_id().pid(),
2801                                    type_name: "hyperactor::proc::tests::TestActor".to_string(),
2802                                    status: ActorStatus::Idle,
2803                                    stats: ActorStats::default(),
2804                                    handlers: HashMap::new(),
2805                                    children: HashMap::new(),
2806                                    events: Vec::new(),
2807                                    spans: Vec::new(),
2808                                }
2809                        },
2810                        events: Vec::new(),
2811                        spans: Vec::new(),
2812                    },
2813                }
2814            );
2815        }
2816
2817        // Finally stop root. No roots should be left in snapshot.
2818        root.drain_and_stop().unwrap();
2819        root.await;
2820        {
2821            let snapshot = proc.state().ledger.snapshot();
2822            assert_eq!(snapshot.roots, hashmap! {});
2823        }
2824    }
2825
2826    #[async_timed_test(timeout_secs = 30)]
2827    async fn test_multi_handler() {
2828        // TEMPORARY: This test is currently a bit awkward since we don't yet expose
2829        // public interfaces to multi-handlers. This will be fixed shortly.
2830
2831        #[derive(Debug)]
2832        struct TestActor(Arc<AtomicUsize>);
2833
2834        #[async_trait]
2835        impl Actor for TestActor {}
2836
2837        #[async_trait]
2838        impl Handler<OncePortHandle<PortHandle<usize>>> for TestActor {
2839            async fn handle(
2840                &mut self,
2841                cx: &crate::Context<Self>,
2842                message: OncePortHandle<PortHandle<usize>>,
2843            ) -> anyhow::Result<()> {
2844                message.send(cx.port())?;
2845                Ok(())
2846            }
2847        }
2848
2849        #[async_trait]
2850        impl Handler<usize> for TestActor {
2851            async fn handle(
2852                &mut self,
2853                _cx: &crate::Context<Self>,
2854                message: usize,
2855            ) -> anyhow::Result<()> {
2856                self.0.fetch_add(message, Ordering::SeqCst);
2857                Ok(())
2858            }
2859        }
2860
2861        let proc = Proc::local();
2862        let state = Arc::new(AtomicUsize::new(0));
2863        let actor = TestActor(state.clone());
2864        let handle = proc.spawn::<TestActor>("test", actor).unwrap();
2865        let (client, _) = proc.instance("client").unwrap();
2866        let (tx, rx) = client.open_once_port();
2867        handle.send(&client, tx).unwrap();
2868        let usize_handle = rx.recv().await.unwrap();
2869        usize_handle.send(123).unwrap();
2870
2871        handle.drain_and_stop().unwrap();
2872        handle.await;
2873
2874        assert_eq!(state.load(Ordering::SeqCst), 123);
2875    }
2876
2877    #[async_timed_test(timeout_secs = 30)]
2878    async fn test_actor_panic() {
2879        // Need this custom hook to store panic backtrace in task_local.
2880        panic_handler::set_panic_hook();
2881
2882        let proc = Proc::local();
2883        // Need to set a supervison coordinator for this Proc because there will
2884        // be actor failure(s) in this test which trigger supervision.
2885        ProcSupervisionCoordinator::set(&proc).await.unwrap();
2886
2887        let (client, _handle) = proc.instance("client").unwrap();
2888        let actor_handle = proc.spawn("test", TestActor).unwrap();
2889        actor_handle
2890            .panic(&client, "some random failure".to_string())
2891            .await
2892            .unwrap();
2893        let actor_status = actor_handle.await;
2894
2895        // Note: even when the test passes, the panic stacktrace will still be
2896        // printed to stderr because that is the behavior controlled by the panic
2897        // hook.
2898        assert_matches!(actor_status, ActorStatus::Failed(_));
2899        if let ActorStatus::Failed(err) = actor_status {
2900            let error_msg = err.to_string();
2901            // Verify panic message is captured
2902            assert!(error_msg.contains("some random failure"));
2903            // Verify backtrace is captured. Note the backtrace message might
2904            // change in the future. If that happens, we need to update this
2905            // statement with something up-to-date.
2906            assert!(error_msg.contains("rust_begin_unwind"));
2907        }
2908    }
2909
2910    #[async_timed_test(timeout_secs = 30)]
2911    async fn test_local_supervision_propagation() {
2912        hyperactor_telemetry::initialize_logging_for_test();
2913
2914        #[derive(Debug)]
2915        struct TestActor(Arc<AtomicBool>, bool);
2916
2917        #[async_trait]
2918        impl Actor for TestActor {
2919            async fn handle_supervision_event(
2920                &mut self,
2921                _this: &Instance<Self>,
2922                _event: &ActorSupervisionEvent,
2923            ) -> Result<bool, anyhow::Error> {
2924                if !self.1 {
2925                    return Ok(false);
2926                }
2927
2928                tracing::error!(
2929                    "{}: supervision event received: {:?}",
2930                    _this.self_id(),
2931                    _event
2932                );
2933                self.0.store(true, Ordering::SeqCst);
2934                Ok(true)
2935            }
2936        }
2937
2938        #[async_trait]
2939        impl Handler<String> for TestActor {
2940            async fn handle(
2941                &mut self,
2942                cx: &crate::Context<Self>,
2943                message: String,
2944            ) -> anyhow::Result<()> {
2945                tracing::info!("{} received message: {}", cx.self_id(), message);
2946                Err(anyhow::anyhow!(message))
2947            }
2948        }
2949
2950        let proc = Proc::local();
2951        let (client, _) = proc.instance("client").unwrap();
2952        let reported_event = ProcSupervisionCoordinator::set(&proc).await.unwrap();
2953
2954        let root_state = Arc::new(AtomicBool::new(false));
2955        let root_1_state = Arc::new(AtomicBool::new(false));
2956        let root_1_1_state = Arc::new(AtomicBool::new(false));
2957        let root_1_1_1_state = Arc::new(AtomicBool::new(false));
2958        let root_2_state = Arc::new(AtomicBool::new(false));
2959        let root_2_1_state = Arc::new(AtomicBool::new(false));
2960
2961        let root = proc
2962            .spawn::<TestActor>("root", TestActor(root_state.clone(), false))
2963            .unwrap();
2964        let root_1 = proc
2965            .spawn_child::<TestActor>(
2966                root.cell().clone(),
2967                TestActor(
2968                    root_1_state.clone(),
2969                    true, /* set true so children's event stops here */
2970                ),
2971            )
2972            .unwrap();
2973        let root_1_1 = proc
2974            .spawn_child::<TestActor>(
2975                root_1.cell().clone(),
2976                TestActor(root_1_1_state.clone(), false),
2977            )
2978            .unwrap();
2979        let root_1_1_1 = proc
2980            .spawn_child::<TestActor>(
2981                root_1_1.cell().clone(),
2982                TestActor(root_1_1_1_state.clone(), false),
2983            )
2984            .unwrap();
2985        let root_2 = proc
2986            .spawn_child::<TestActor>(root.cell().clone(), TestActor(root_2_state.clone(), false))
2987            .unwrap();
2988        let root_2_1 = proc
2989            .spawn_child::<TestActor>(
2990                root_2.cell().clone(),
2991                TestActor(root_2_1_state.clone(), false),
2992            )
2993            .unwrap();
2994
2995        // fail `root_1_1_1`, the supervision msg should be propagated to
2996        // `root_1` because `root_1` has set `true` to `handle_supervision_event`.
2997        root_1_1_1
2998            .send::<String>(&client, "some random failure".into())
2999            .unwrap();
3000
3001        // fail `root_2_1`, the supervision msg should be propagated to
3002        // ProcSupervisionCoordinator.
3003        root_2_1
3004            .send::<String>(&client, "some random failure".into())
3005            .unwrap();
3006
3007        RealClock.sleep(Duration::from_secs(1)).await;
3008
3009        assert!(!root_state.load(Ordering::SeqCst));
3010        assert!(root_1_state.load(Ordering::SeqCst));
3011        assert!(!root_1_1_state.load(Ordering::SeqCst));
3012        assert!(!root_1_1_1_state.load(Ordering::SeqCst));
3013        assert!(!root_2_state.load(Ordering::SeqCst));
3014        assert!(!root_2_1_state.load(Ordering::SeqCst));
3015        assert_eq!(
3016            reported_event.event().map(|e| e.actor_id.clone()),
3017            Some(root_2_1.actor_id().clone())
3018        );
3019    }
3020
3021    #[async_timed_test(timeout_secs = 30)]
3022    async fn test_instance() {
3023        #[derive(Debug, Default)]
3024        struct TestActor;
3025
3026        impl Actor for TestActor {}
3027
3028        #[async_trait]
3029        impl Handler<(String, PortRef<String>)> for TestActor {
3030            async fn handle(
3031                &mut self,
3032                cx: &crate::Context<Self>,
3033                (message, port): (String, PortRef<String>),
3034            ) -> anyhow::Result<()> {
3035                port.send(cx, message)?;
3036                Ok(())
3037            }
3038        }
3039
3040        let proc = Proc::local();
3041
3042        let (instance, handle) = proc.instance("my_test_actor").unwrap();
3043
3044        let child_actor = TestActor.spawn(&instance).unwrap();
3045
3046        let (port, mut receiver) = instance.open_port();
3047        child_actor
3048            .send(&instance, ("hello".to_string(), port.bind()))
3049            .unwrap();
3050
3051        let message = receiver.recv().await.unwrap();
3052        assert_eq!(message, "hello");
3053
3054        child_actor.drain_and_stop().unwrap();
3055        child_actor.await;
3056
3057        assert_eq!(*handle.status().borrow(), ActorStatus::Client);
3058        drop(instance);
3059        assert_eq!(*handle.status().borrow(), ActorStatus::Stopped);
3060        handle.await;
3061    }
3062
3063    #[tokio::test]
3064    async fn test_proc_terminate_without_coordinator() {
3065        if std::env::var("CARGO_TEST").is_ok() {
3066            eprintln!("test skipped as it hangs when run by cargo in sandcastle");
3067            return;
3068        }
3069
3070        let process = async {
3071            let proc = Proc::local();
3072            // Intentionally not setting a proc supervison coordinator. This
3073            // should cause the process to terminate.
3074            // ProcSupervisionCoordinator::set(&proc).await.unwrap();
3075            let root = proc.spawn("root", TestActor).unwrap();
3076            let (client, _handle) = proc.instance("client").unwrap();
3077            root.fail(&client, anyhow::anyhow!("some random failure"))
3078                .await
3079                .unwrap();
3080            // It is okay to sleep a long time here, because we expect this
3081            // process to be terminated way before the sleep ends due to the
3082            // missing proc supervison coordinator.
3083            RealClock.sleep(Duration::from_secs(30)).await;
3084        };
3085
3086        assert_termination(|| process, 1).await.unwrap();
3087    }
3088
3089    fn trace_and_block(fut: impl Future) {
3090        tracing::subscriber::with_default(
3091            tracing_subscriber::Registry::default().with(hyperactor_telemetry::recorder().layer()),
3092            || {
3093                tokio::runtime::Builder::new_current_thread()
3094                    .enable_all()
3095                    .build()
3096                    .unwrap()
3097                    .block_on(fut)
3098            },
3099        );
3100    }
3101
3102    #[ignore = "until trace recording is turned back on"]
3103    #[test]
3104    fn test_handler_logging() {
3105        #[derive(Debug, Default)]
3106        struct LoggingActor;
3107
3108        impl Actor for LoggingActor {}
3109
3110        impl LoggingActor {
3111            async fn wait(cx: &impl context::Actor, handle: &ActorHandle<Self>) {
3112                let barrier = Arc::new(Barrier::new(2));
3113                handle.send(cx, barrier.clone()).unwrap();
3114                barrier.wait().await;
3115            }
3116        }
3117
3118        #[async_trait]
3119        impl Handler<String> for LoggingActor {
3120            async fn handle(
3121                &mut self,
3122                _cx: &crate::Context<Self>,
3123                message: String,
3124            ) -> anyhow::Result<()> {
3125                tracing::info!("{}", message);
3126                Ok(())
3127            }
3128        }
3129
3130        #[async_trait]
3131        impl Handler<u64> for LoggingActor {
3132            async fn handle(
3133                &mut self,
3134                _cx: &crate::Context<Self>,
3135                message: u64,
3136            ) -> anyhow::Result<()> {
3137                tracing::event!(Level::INFO, number = message);
3138                Ok(())
3139            }
3140        }
3141
3142        #[async_trait]
3143        impl Handler<Arc<Barrier>> for LoggingActor {
3144            async fn handle(
3145                &mut self,
3146                _cx: &crate::Context<Self>,
3147                message: Arc<Barrier>,
3148            ) -> anyhow::Result<()> {
3149                message.wait().await;
3150                Ok(())
3151            }
3152        }
3153
3154        #[async_trait]
3155        impl Handler<Arc<(Barrier, Barrier)>> for LoggingActor {
3156            async fn handle(
3157                &mut self,
3158                _cx: &crate::Context<Self>,
3159                barriers: Arc<(Barrier, Barrier)>,
3160            ) -> anyhow::Result<()> {
3161                let inner = tracing::span!(Level::INFO, "child_span");
3162                let _inner_guard = inner.enter();
3163                barriers.0.wait().await;
3164                barriers.1.wait().await;
3165                Ok(())
3166            }
3167        }
3168
3169        trace_and_block(async {
3170            let proc = Proc::local();
3171            let (client, _) = proc.instance("client").unwrap();
3172            let handle = LoggingActor.spawn_detached().unwrap();
3173            handle.send(&client, "hello world".to_string()).unwrap();
3174            handle
3175                .send(&client, "hello world again".to_string())
3176                .unwrap();
3177            handle.send(&client, 123u64).unwrap();
3178
3179            LoggingActor::wait(&client, &handle).await;
3180
3181            let events = handle.cell().inner.recording.tail();
3182            assert_eq!(events.len(), 3);
3183            assert_eq!(events[0].json_value(), json!({ "message": "hello world" }));
3184            assert_eq!(
3185                events[1].json_value(),
3186                json!({ "message": "hello world again" })
3187            );
3188            assert_eq!(events[2].json_value(), json!({ "number": 123 }));
3189
3190            let stacks = {
3191                let barriers = Arc::new((Barrier::new(2), Barrier::new(2)));
3192                handle.send(&client, Arc::clone(&barriers)).unwrap();
3193                barriers.0.wait().await;
3194                let stacks = handle.cell().inner.recording.stacks();
3195                barriers.1.wait().await;
3196                stacks
3197            };
3198            assert_eq!(stacks.len(), 1);
3199            assert_eq!(stacks[0].len(), 1);
3200            assert_eq!(stacks[0][0].name(), "child_span");
3201        })
3202    }
3203}