1use std::any::Any;
15use std::any::TypeId;
16use std::collections::HashMap;
17use std::fmt;
18use std::future::Future;
19use std::hash::Hash;
20use std::hash::Hasher;
21use std::ops::Deref;
22use std::panic;
23use std::panic::AssertUnwindSafe;
24use std::pin::Pin;
25use std::sync::Arc;
26use std::sync::Mutex;
27use std::sync::OnceLock;
28use std::sync::Weak;
29use std::sync::atomic::AtomicU64;
30use std::sync::atomic::AtomicUsize;
31use std::sync::atomic::Ordering;
32use std::time::Duration;
33use std::time::SystemTime;
34
35use async_trait::async_trait;
36use dashmap::DashMap;
37use dashmap::mapref::entry::Entry;
38use dashmap::mapref::multiple::RefMulti;
39use futures::FutureExt;
40use hyperactor_telemetry::recorder;
41use hyperactor_telemetry::recorder::Recording;
42use serde::Deserialize;
43use serde::Serialize;
44use tokio::sync::mpsc;
45use tokio::sync::watch;
46use tokio::task::JoinHandle;
47use tracing::Instrument;
48
49use crate as hyperactor;
50use crate::Actor;
51use crate::ActorRef;
52use crate::Handler;
53use crate::Message;
54use crate::Named;
55use crate::RemoteMessage;
56use crate::accum::ReducerSpec;
57use crate::actor::ActorError;
58use crate::actor::ActorErrorKind;
59use crate::actor::ActorHandle;
60use crate::actor::ActorStatus;
61use crate::actor::Binds;
62use crate::actor::RemoteActor;
63use crate::actor::RemoteHandles;
64use crate::actor::Signal;
65use crate::attrs::Attrs;
66use crate::cap;
67use crate::clock::Clock;
68use crate::clock::ClockKind;
69use crate::clock::RealClock;
70use crate::data::Serialized;
71use crate::data::TypeInfo;
72use crate::mailbox::BoxedMailboxSender;
73use crate::mailbox::DeliveryError;
74use crate::mailbox::Mailbox;
75use crate::mailbox::MailboxMuxer;
76use crate::mailbox::MailboxSender;
77use crate::mailbox::MessageEnvelope;
78use crate::mailbox::OncePortHandle;
79use crate::mailbox::OncePortReceiver;
80use crate::mailbox::PanickingMailboxSender;
81use crate::mailbox::PortHandle;
82use crate::mailbox::PortReceiver;
83use crate::mailbox::Undeliverable;
84use crate::metrics::ACTOR_MESSAGE_HANDLER_DURATION;
85use crate::metrics::ACTOR_MESSAGE_QUEUE_SIZE;
86use crate::metrics::ACTOR_MESSAGES_RECEIVED;
87use crate::panic_handler;
88use crate::reference::ActorId;
89use crate::reference::Index;
90use crate::reference::PortId;
91use crate::reference::ProcId;
92use crate::reference::id;
93use crate::supervision::ActorSupervisionEvent;
94
95static NEXT_LOCAL_RANK: AtomicUsize = AtomicUsize::new(0);
97
98#[derive(Clone, Debug)]
105pub struct Proc {
106 inner: Arc<ProcState>,
107}
108
109#[derive(Debug)]
110struct ProcState {
111 proc_id: ProcId,
114
115 proc_muxer: MailboxMuxer,
118
119 forwarder: BoxedMailboxSender,
121
122 roots: DashMap<String, AtomicUsize>,
126
127 ledger: ActorLedger,
129
130 instances: DashMap<ActorId, WeakInstanceCell>,
131
132 supervision_coordinator_port: OnceLock<PortHandle<ActorSupervisionEvent>>,
135
136 clock: ClockKind,
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
141pub struct ActorLedgerSnapshot {
142 pub roots: HashMap<ActorId, ActorTreeSnapshot>,
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
149pub struct Event {
150 pub time: SystemTime,
152 pub fields: Vec<(String, recorder::Value)>,
154 pub seq: usize,
156}
157
158impl From<recorder::Event> for Event {
159 fn from(event: recorder::Event) -> Event {
160 Event {
161 time: event.time,
162 fields: event.fields(),
163 seq: event.seq,
164 }
165 }
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
170pub struct ActorTreeSnapshot {
171 pub pid: Index,
173
174 pub type_name: String,
178
179 pub status: ActorStatus,
181
182 pub stats: ActorStats,
184
185 pub handlers: HashMap<u64, String>,
187
188 pub children: HashMap<Index, ActorTreeSnapshot>,
190
191 pub events: Vec<Event>,
193
194 pub spans: Vec<Vec<String>>,
197}
198
199impl Hash for ActorTreeSnapshot {
200 fn hash<H: Hasher>(&self, state: &mut H) {
201 self.pid.hash(state);
202 }
203}
204
205#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
207#[derive(Default)]
208pub struct ActorStats {
209 num_processed_messages: u64,
211}
212
213impl fmt::Display for ActorStats {
214 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215 write!(f, "num_processed_messages={}", self.num_processed_messages)
216 }
217}
218
219#[derive(Debug)]
220struct ActorLedger {
221 roots: DashMap<ActorId, WeakInstanceCell>,
223}
224
225impl ActorLedger {
226 fn new() -> Self {
227 Self {
228 roots: DashMap::new(),
229 }
230 }
231
232 fn insert(
233 &self,
234 root_actor_id: ActorId,
235 root_actor_cell: WeakInstanceCell,
236 ) -> Result<(), anyhow::Error> {
237 match self.roots.insert(root_actor_id.clone(), root_actor_cell) {
238 None => Ok(()),
239 Some(current_cell) => {
242 let debugging_msg = match current_cell.upgrade() {
243 Some(cell) => format!("the stored cell's actor ID is {}", cell.actor_id()),
244 None => "the stored cell has been dropped".to_string(),
245 };
246
247 Err(anyhow::anyhow!(
248 "actor '{root_actor_id}' has already been added to ledger: {debugging_msg}"
249 ))
250 }
251 }
252 }
253
254 fn snapshot(&self) -> ActorLedgerSnapshot {
256 let roots = self
257 .roots
258 .iter()
259 .flat_map(|r| {
260 let (actor_id, weak_cell) = r.pair();
261 weak_cell
265 .upgrade()
266 .map(|cell| (actor_id.clone(), Self::get_actor_tree_snapshot(&cell)))
267 })
268 .collect();
269
270 ActorLedgerSnapshot { roots }
271 }
272
273 fn get_actor_tree_snapshot(cell: &InstanceCell) -> ActorTreeSnapshot {
274 let children = cell
276 .child_iter()
277 .map(|child| (child.pid(), Self::get_actor_tree_snapshot(child.value())))
278 .collect();
279
280 ActorTreeSnapshot {
281 pid: cell.actor_id().pid(),
282 type_name: cell.inner.actor_type.type_name().to_string(),
283 status: cell.status().borrow().clone(),
284 stats: ActorStats {
285 num_processed_messages: cell.inner.num_processed_messages.load(Ordering::SeqCst),
286 },
287 handlers: cell
288 .inner
289 .exported_named_ports
290 .iter()
291 .map(|entry| (*entry.key(), entry.value().to_string()))
292 .collect(),
293 children,
294 events: cell
295 .inner
296 .recording
297 .tail()
298 .into_iter()
299 .map(Event::from)
300 .collect(),
301 spans: cell
302 .inner
303 .recording
304 .stacks()
305 .into_iter()
306 .map(|stack| {
307 stack
308 .into_iter()
309 .map(|meta| meta.name().to_string())
310 .collect()
311 })
312 .collect(),
313 }
314 }
315}
316
317impl Proc {
318 pub fn new(proc_id: ProcId, forwarder: BoxedMailboxSender) -> Self {
320 Self::new_with_clock(proc_id, forwarder, ClockKind::default())
321 }
322
323 pub fn new_with_clock(
325 proc_id: ProcId,
326 forwarder: BoxedMailboxSender,
327 clock: ClockKind,
328 ) -> Self {
329 Self {
330 inner: Arc::new(ProcState {
331 proc_id,
332 proc_muxer: MailboxMuxer::new(),
333 forwarder,
334 roots: DashMap::new(),
335 ledger: ActorLedger::new(),
336 instances: DashMap::new(),
337 supervision_coordinator_port: OnceLock::new(),
338 clock,
339 }),
340 }
341 }
342
343 pub fn set_supervision_coordinator(
346 &self,
347 port: PortHandle<ActorSupervisionEvent>,
348 ) -> Result<(), anyhow::Error> {
349 self.state()
350 .supervision_coordinator_port
351 .set(port)
352 .map_err(|existing| anyhow::anyhow!("coordinator port is already set to {existing}"))
353 }
354
355 fn handle_supervision_event(&self, event: ActorSupervisionEvent) {
356 let result = match self.state().supervision_coordinator_port.get() {
357 Some(port) => port.send(event).map_err(anyhow::Error::from),
358 None => Err(anyhow::anyhow!(
359 "coordinator port is not set for proc {}",
360 self.proc_id()
361 )),
362 };
363 if let Err(err) = result {
364 tracing::error!(
365 "proc {}: could not propagate supervision event: {:?}: crashing",
366 self.proc_id(),
367 err
368 );
369
370 std::process::exit(1);
371 }
372 }
373
374 pub fn local() -> Self {
377 let proc_id = ProcId::Ranked(id!(local), NEXT_LOCAL_RANK.fetch_add(1, Ordering::Relaxed));
380 Proc::new(proc_id, BoxedMailboxSender::new(PanickingMailboxSender))
382 }
383
384 pub fn proc_id(&self) -> &ProcId {
386 &self.state().proc_id
387 }
388
389 pub fn forwarder(&self) -> &BoxedMailboxSender {
392 &self.inner.forwarder
393 }
394
395 fn state(&self) -> &ProcState {
397 self.inner.as_ref()
398 }
399
400 pub fn clock(&self) -> &ClockKind {
402 &self.state().clock
403 }
404
405 pub fn ledger_snapshot(&self) -> ActorLedgerSnapshot {
407 self.state().ledger.snapshot()
408 }
409
410 pub fn attach(&self, name: &str) -> Result<Mailbox, anyhow::Error> {
412 let actor_id: ActorId = self.allocate_root_id(name)?;
413 Ok(self.bind_mailbox(actor_id))
414 }
415
416 pub fn attach_child(&self, parent_id: &ActorId) -> Result<Mailbox, anyhow::Error> {
418 let actor_id: ActorId = self.allocate_child_id(parent_id)?;
419 Ok(self.bind_mailbox(actor_id))
420 }
421
422 fn bind_mailbox(&self, actor_id: ActorId) -> Mailbox {
424 let mbox = Mailbox::new(actor_id, BoxedMailboxSender::new(self.downgrade()));
425
426 self.state().proc_muxer.bind_mailbox(mbox.clone());
429 mbox
430 }
431
432 pub fn attach_actor<R, M>(
435 &self,
436 name: &str,
437 ) -> Result<(Mailbox, ActorRef<R>, PortReceiver<M>), anyhow::Error>
438 where
439 M: RemoteMessage,
440 R: RemoteActor + RemoteHandles<M>,
441 {
442 let mbox = self.attach(name)?;
443 let (handle, rx) = mbox.open_port::<M>();
444 handle.bind_to(M::port());
445 let actor_ref = ActorRef::attest(mbox.actor_id().clone());
446 Ok((mbox, actor_ref, rx))
447 }
448
449 pub async fn spawn<A: Actor>(
452 &self,
453 name: &str,
454 params: A::Params,
455 ) -> Result<ActorHandle<A>, anyhow::Error> {
456 let actor_id = self.allocate_root_id(name)?;
457 let _ = tracing::debug_span!(
458 "spawn_actor",
459 actor_name = name,
460 actor_type = std::any::type_name::<A>(),
461 actor_id = actor_id.to_string(),
462 );
463 let instance = Instance::new(self.clone(), actor_id.clone(), false, None);
464 let actor = A::new(params).await?;
465 self.state()
469 .ledger
470 .insert(actor_id.clone(), instance.cell.downgrade())?;
471
472 instance.start(actor).await
473 }
474
475 pub fn instance(&self, name: &str) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
481 let actor_id = self.allocate_root_id(name)?;
482 let _ = tracing::debug_span!(
483 "actor_instance",
484 actor_name = name,
485 actor_type = std::any::type_name::<()>(),
486 actor_id = actor_id.to_string(),
487 );
488
489 let instance = Instance::new(self.clone(), actor_id.clone(), true, None);
490 let handle = ActorHandle::new(instance.cell.clone(), instance.ports.clone());
491
492 instance.change_status(ActorStatus::Client);
493
494 Ok((instance, handle))
495 }
496
497 async fn spawn_child<A: Actor>(
503 &self,
504 parent: InstanceCell,
505 params: A::Params,
506 ) -> Result<ActorHandle<A>, anyhow::Error> {
507 let actor_id = self.allocate_child_id(parent.actor_id())?;
508 let instance = Instance::new(self.clone(), actor_id, false, Some(parent.clone()));
509 let actor = A::new(params).await?;
510 instance.start(actor).await
511 }
512
513 pub fn abort_root_actor(&self, root: &ActorId) -> Option<ActorId> {
517 self.state()
518 .ledger
519 .roots
520 .get(root)
521 .into_iter()
522 .flat_map(|e| e.upgrade())
523 .map(|cell| {
524 let h = cell.actor_task_handle().unwrap();
530 tracing::debug!("{}: aborting {:?}", root, h);
531 h.abort();
532 root.clone()
533 })
534 .next()
535 }
536
537 pub fn stop_actor(&self, actor_id: &ActorId) -> Option<watch::Receiver<ActorStatus>> {
540 if let Some(entry) = self.state().ledger.roots.get(actor_id) {
541 match entry.value().upgrade() {
542 None => None, Some(cell) => {
544 tracing::info!("sending stop signal to {}", cell.actor_id());
545 if let Err(err) = cell.signal(Signal::DrainAndStop) {
546 tracing::error!(
547 "{}: failed to send stop signal to pid {}: {:?}",
548 self.proc_id(),
549 cell.pid(),
550 err
551 );
552 None
553 } else {
554 Some(cell.status().clone())
555 }
556 }
557 }
558 } else {
559 tracing::error!("no actor {} found in {} roots", actor_id, self.proc_id());
560 None
561 }
562 }
563
564 #[hyperactor::instrument]
570 pub async fn destroy_and_wait(
571 &mut self,
572 timeout: Duration,
573 skip_waiting: Option<&ActorId>,
574 ) -> Result<(Vec<ActorId>, Vec<ActorId>), anyhow::Error> {
575 tracing::debug!("{}: proc stopping", self.proc_id());
576
577 let mut statuses = HashMap::new();
578 for actor_id in self
579 .state()
580 .ledger
581 .roots
582 .iter()
583 .map(|entry| entry.key().clone())
584 .collect::<Vec<_>>()
585 {
586 if let Some(status) = self.stop_actor(&actor_id) {
587 statuses.insert(actor_id, status);
588 }
589 }
590 tracing::debug!("{}: proc stopped", self.proc_id());
591
592 let waits: Vec<_> = statuses
593 .iter_mut()
594 .filter(|(actor_id, _)| Some(*actor_id) != skip_waiting)
595 .map(|(actor_id, root)| {
596 let actor_id = actor_id.clone();
597 async move {
598 RealClock
599 .timeout(
600 timeout,
601 root.wait_for(|state: &ActorStatus| {
602 matches!(*state, ActorStatus::Stopped)
603 }),
604 )
605 .await
606 .ok()
607 .map(|_| actor_id)
608 }
609 })
610 .collect();
611
612 let results = futures::future::join_all(waits).await;
613 let stopped_actors: Vec<_> = results
614 .iter()
615 .filter_map(|actor_id| actor_id.as_ref())
616 .cloned()
617 .collect();
618 let aborted_actors: Vec<_> = statuses
619 .iter()
620 .filter(|(actor_id, _)| !stopped_actors.contains(actor_id))
621 .map(|(actor_id, _)| {
622 let _: Option<ActorId> = self.abort_root_actor(actor_id);
623 actor_id.clone()
630 })
631 .collect();
632
633 tracing::info!(
634 "destroy_and_wait: {} actors stopped, {} actors aborted",
635 stopped_actors.len(),
636 aborted_actors.len()
637 );
638 Ok((stopped_actors, aborted_actors))
639 }
640
641 #[hyperactor::instrument]
643 fn allocate_root_id(&self, name: &str) -> Result<ActorId, anyhow::Error> {
644 let name = name.to_string();
645 match self.state().roots.entry(name.to_string()) {
646 Entry::Vacant(entry) => {
647 entry.insert(AtomicUsize::new(1));
648 }
649 Entry::Occupied(_) => {
650 anyhow::bail!("an actor with name '{}' has already been spawned", name)
651 }
652 }
653 Ok(ActorId(self.state().proc_id.clone(), name.to_string(), 0))
654 }
655
656 pub(crate) fn allocate_child_id(&self, parent_id: &ActorId) -> Result<ActorId, anyhow::Error> {
658 assert_eq!(*parent_id.proc_id(), self.state().proc_id);
659 let pid = match self.state().roots.get(parent_id.name()) {
660 None => anyhow::bail!(
661 "no actor named {} in proc {}",
662 parent_id.name(),
663 self.state().proc_id
664 ),
665 Some(next_pid) => next_pid.fetch_add(1, Ordering::Relaxed),
666 };
667 Ok(parent_id.child_id(pid))
668 }
669
670 fn downgrade(&self) -> WeakProc {
671 WeakProc::new(self)
672 }
673}
674
675#[async_trait]
676impl MailboxSender for Proc {
677 fn post(
678 &self,
679 envelope: MessageEnvelope,
680 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
681 ) {
682 if envelope.dest().actor_id().proc_id() == &self.state().proc_id {
683 self.state().proc_muxer.post(envelope, return_handle)
684 } else {
685 self.state().forwarder.post(envelope, return_handle)
686 }
687 }
688}
689
690#[derive(Debug)]
691struct WeakProc(Weak<ProcState>);
692
693impl WeakProc {
694 fn new(proc: &Proc) -> Self {
695 Self(Arc::downgrade(&proc.inner))
696 }
697
698 fn upgrade(&self) -> Option<Proc> {
699 self.0.upgrade().map(|inner| Proc { inner })
700 }
701}
702
703#[async_trait]
704impl MailboxSender for WeakProc {
705 fn post(
706 &self,
707 envelope: MessageEnvelope,
708 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
709 ) {
710 match self.upgrade() {
711 Some(proc) => proc.post(envelope, return_handle),
712 None => envelope.undeliverable(
713 DeliveryError::BrokenLink("fail to upgrade WeakProc".to_string()),
714 return_handle,
715 ),
716 }
717 }
718}
719
720struct WorkCell<A: Actor + Send>(
723 Box<
724 dyn for<'a> FnOnce(
725 &'a mut A,
726 &'a mut Instance<A>,
727 )
728 -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
729 + Send
730 + Sync,
731 >,
732);
733
734impl<A: Actor + Send> WorkCell<A> {
735 fn new(
737 f: impl for<'a> FnOnce(
738 &'a mut A,
739 &'a mut Instance<A>,
740 )
741 -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
742 + Send
743 + Sync
744 + 'static,
745 ) -> Self {
746 Self(Box::new(f))
747 }
748
749 fn handle<'a>(
751 self,
752 actor: &'a mut A,
753 instance: &'a mut Instance<A>,
754 ) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'a>> {
755 (self.0)(actor, instance)
756 }
757}
758
759pub struct Context<'a, A: Actor> {
761 instance: &'a Instance<A>,
762 headers: Attrs,
763}
764
765impl<'a, A: Actor> Context<'a, A> {
766 pub fn new(instance: &'a Instance<A>, headers: Attrs) -> Self {
768 Self { instance, headers }
769 }
770
771 pub fn headers(&self) -> &Attrs {
773 &self.headers
774 }
775}
776
777impl<A: Actor> Deref for Context<'_, A> {
778 type Target = Instance<A>;
779
780 fn deref(&self) -> &Self::Target {
781 self.instance
782 }
783}
784
785pub struct Instance<A: Actor> {
789 proc: Proc,
791
792 cell: InstanceCell,
794
795 mailbox: Mailbox,
797
798 actor_loop_receivers: Option<(PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>)>,
800
801 ports: Arc<Ports<A>>,
802
803 work_rx: mpsc::UnboundedReceiver<WorkCell<A>>,
805
806 status_tx: watch::Sender<ActorStatus>,
808
809 status_span: Mutex<tracing::Span>,
810
811 _last_status_change: Arc<tokio::time::Instant>,
813}
814
815impl<A: Actor> Instance<A> {
816 pub(crate) fn new(
818 proc: Proc,
819 actor_id: ActorId,
820 detached: bool,
821 parent: Option<InstanceCell>,
822 ) -> Self {
823 let mailbox = Mailbox::new(actor_id.clone(), BoxedMailboxSender::new(proc.downgrade()));
825 let (work_tx, work_rx) = mpsc::unbounded_channel();
826 let ports: Arc<Ports<A>> = Arc::new(Ports::new(mailbox.clone(), work_tx));
827 proc.state().proc_muxer.bind_mailbox(mailbox.clone());
828 let (status_tx, status_rx) = watch::channel(ActorStatus::Created);
829
830 let actor_type = match TypeInfo::of::<A>() {
831 Some(info) => ActorType::Named(info),
832 None => ActorType::Anonymous(std::any::type_name::<A>()),
833 };
834 let ais = actor_id.to_string();
835
836 let actor_loop_ports = if detached {
837 None
838 } else {
839 let (signal_port, signal_receiver) = ports.open_message_port().unwrap();
840 let (supervision_port, supervision_receiver) = mailbox.open_port();
841 Some((
842 (signal_port, supervision_port),
843 (signal_receiver, supervision_receiver),
844 ))
845 };
846
847 let (actor_loop, actor_loop_receivers) = actor_loop_ports.unzip();
848
849 let cell = InstanceCell::new(
850 actor_id,
851 actor_type,
852 proc.clone(),
853 actor_loop,
854 status_rx,
855 parent,
856 ports.clone(),
857 );
858 let start = proc.clock().now();
859
860 Self {
861 proc,
862 cell,
863 mailbox,
864 actor_loop_receivers,
865 ports,
866 work_rx,
867 status_tx,
868 status_span: Mutex::new(tracing::debug_span!(
869 "actor_status",
870 actor_id = ais,
871 name = "created"
872 )),
873 _last_status_change: Arc::new(start),
874 }
875 }
876
877 fn change_status(&self, new: ActorStatus) {
880 self.status_tx.send_replace(new.clone());
882 let actor_id_str = self.self_id().to_string();
883 *self.status_span.lock().expect("can't change") = tracing::debug_span!(
884 "actor_status",
885 actor_id = actor_id_str,
886 name = new.arm().unwrap_or_default()
887 );
888 }
889
890 pub fn self_id(&self) -> &ActorId {
892 self.mailbox.actor_id()
893 }
894
895 #[allow(clippy::result_large_err)] pub fn stop(&self) -> Result<(), ActorError> {
898 tracing::info!("Instance::stop called, {}", self.cell.actor_id());
899 self.cell.signal(Signal::DrainAndStop)
900 }
901
902 pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
907 self.mailbox.open_port()
908 }
909
910 pub fn open_once_port<M: Message>(&self) -> (OncePortHandle<M>, OncePortReceiver<M>) {
914 self.mailbox.open_once_port()
915 }
916
917 pub fn post(&self, port_id: PortId, headers: Attrs, message: Serialized) {
919 <Self as cap::sealed::CanSend>::post(self, port_id, headers, message)
920 }
921
922 #[allow(clippy::result_large_err)] pub fn self_message_with_delay<M>(&self, message: M, delay: Duration) -> Result<(), ActorError>
925 where
926 M: Message,
927 A: Handler<M>,
928 {
929 let port = self.port();
930 let self_id = self.self_id().clone();
931 let clock = self.proc.state().clock.clone();
932 tokio::spawn(async move {
933 clock.non_advancing_sleep(delay).await;
934 if let Err(e) = port.send(message) {
935 tracing::info!("{}: error sending delayed message: {}", self_id, e);
938 }
939 });
940 Ok(())
941 }
942
943 #[hyperactor::instrument]
946 async fn start(self, actor: A) -> Result<ActorHandle<A>, anyhow::Error> {
947 let instance_cell = self.cell.clone();
948 let actor_id = self.cell.actor_id().clone();
949 let actor_handle = ActorHandle::new(self.cell.clone(), self.ports.clone());
950 let actor_task_handle =
951 A::spawn_server_task(panic_handler::with_backtrace_tracking(self.serve(actor)));
952 tracing::debug!("{}: spawned with {:?}", actor_id, actor_task_handle);
953 instance_cell
954 .inner
955 .actor_task_handle
956 .set(actor_task_handle)
957 .unwrap_or_else(|_| panic!("{}: task handle store failed", actor_id));
958
959 Ok(actor_handle)
960 }
961
962 async fn serve(mut self, mut actor: A) {
963 let actor_loop_receivers = self.actor_loop_receivers.take().unwrap();
964
965 let result = self.run_actor_tree(&mut actor, actor_loop_receivers).await;
966
967 let (actor_status, event) = match result {
968 Ok(_) => (ActorStatus::Stopped, None),
969 Err(ActorError {
970 kind: ActorErrorKind::UnhandledSupervisionEvent(event),
971 ..
972 }) => (event.actor_status.clone(), Some(event)),
973 Err(err) => (
974 ActorStatus::Failed(err.to_string()),
975 Some(ActorSupervisionEvent {
976 actor_id: self.cell.actor_id().clone(),
977 actor_status: ActorStatus::Failed(err.to_string()),
978 message_headers: None,
979 caused_by: None,
980 }),
981 ),
982 };
983
984 if let Some(parent) = self.cell.maybe_unlink_parent() {
985 if let Some(event) = event {
986 parent.send_supervision_event_or_crash(event);
988 }
989 if let Err(err) = parent.signal(Signal::ChildStopped(self.cell.pid())) {
992 tracing::error!(
993 "{}: failed to send stop message to parent pid {}: {:?}",
994 self.self_id(),
995 parent.pid(),
996 err
997 );
998 }
999 } else {
1000 if let Some(event) = event {
1006 self.proc.handle_supervision_event(event);
1007 }
1008 }
1009 self.change_status(actor_status);
1010 }
1011
1012 async fn run_actor_tree(
1015 &mut self,
1016 actor: &mut A,
1017 mut actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1018 ) -> Result<(), ActorError> {
1019 let result = match AssertUnwindSafe(self.run(actor, &mut actor_loop_receivers))
1025 .catch_unwind()
1026 .await
1027 {
1028 Ok(result) => result,
1029 Err(err) => {
1030 let err_msg = err
1032 .downcast_ref::<&str>()
1033 .copied()
1034 .or_else(|| err.downcast_ref::<String>().map(|s| s.as_str()))
1035 .unwrap_or("panic cannot be downcasted");
1036
1037 let backtrace = panic_handler::take_panic_backtrace()
1038 .unwrap_or_else(|e| format!("Cannot take backtrace due to: {:?}", e));
1039 Err(ActorError::new(
1040 self.self_id().clone(),
1041 ActorErrorKind::Panic(anyhow::anyhow!("{}\n{}", err_msg, backtrace)),
1042 ))
1043 }
1044 };
1045
1046 if let Err(ref err) = result {
1047 tracing::error!("{}: actor failure: {}", self.self_id(), err);
1048 }
1049 self.change_status(ActorStatus::Stopping);
1050
1051 let mut to_unlink = Vec::new();
1054 for child in self.cell.child_iter() {
1055 if let Err(err) = child.value().signal(Signal::Stop) {
1056 tracing::error!(
1057 "{}: failed to send stop signal to child pid {}: {:?}",
1058 self.self_id(),
1059 child.key(),
1060 err
1061 );
1062 to_unlink.push(child.value().clone());
1063 }
1064 }
1065 for child in to_unlink {
1067 self.cell.unlink(&child);
1068 }
1069
1070 let (mut signal_receiver, _) = actor_loop_receivers;
1071 while self.cell.child_count() > 0 {
1072 match signal_receiver.recv().await? {
1073 Signal::ChildStopped(pid) => {
1074 assert!(self.cell.get_child(pid).is_none());
1075 }
1076 _ => (),
1077 }
1078 }
1079
1080 result
1081 }
1082
1083 async fn run(
1085 &mut self,
1086 actor: &mut A,
1087 actor_loop_receivers: &mut (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1088 ) -> Result<(), ActorError> {
1089 tracing::debug!("entering actor loop: {}", self.self_id());
1090
1091 let (signal_receiver, supervision_event_receiver) = actor_loop_receivers;
1092
1093 self.change_status(ActorStatus::Initializing);
1094 actor
1095 .init(self)
1096 .await
1097 .map_err(|err| ActorError::new(self.self_id().clone(), ActorErrorKind::Init(err)))?;
1098 let need_drain;
1099 'messages: loop {
1100 self.change_status(ActorStatus::Idle);
1101 let metric_pairs =
1102 hyperactor_telemetry::kv_pairs!("actor_id" => self.self_id().to_string());
1103 tokio::select! {
1104 work = self.work_rx.recv() => {
1105 ACTOR_MESSAGES_RECEIVED.add(1, metric_pairs);
1106 ACTOR_MESSAGE_QUEUE_SIZE.add(-1, metric_pairs);
1107 let _ = ACTOR_MESSAGE_HANDLER_DURATION.start(metric_pairs);
1108 let work = work.expect("inconsistent work queue state");
1109 if let Err(err) = work.handle(actor, self).await {
1110 for supervision_event in supervision_event_receiver.drain() {
1111 self.handle_supervision_event(actor, supervision_event).await?;
1112 }
1113 return Err(ActorError::new(self.self_id().clone(), ActorErrorKind::Processing(err)));
1114 }
1115 }
1116 signal = signal_receiver.recv() => {
1117 let signal = signal.map_err(ActorError::from);
1118 tracing::debug!("Received signal {signal:?}");
1119 match signal? {
1120 signal@(Signal::Stop | Signal::DrainAndStop) => {
1121 need_drain = matches!(signal, Signal::DrainAndStop);
1122 break 'messages;
1123 },
1124 Signal::ChildStopped(pid) => {
1125 assert!(self.cell.get_child(pid).is_none());
1126 },
1127 }
1128 }
1129 Ok(supervision_event) = supervision_event_receiver.recv() => {
1130 self.handle_supervision_event(actor, supervision_event).await?;
1131 }
1132 }
1133 self.cell
1134 .inner
1135 .num_processed_messages
1136 .fetch_add(1, Ordering::SeqCst);
1137 }
1138
1139 if need_drain {
1140 self.change_status(ActorStatus::Stopping);
1141 let mut n = 0;
1142 while let Ok(work) = self.work_rx.try_recv() {
1143 if let Err(err) = work.handle(actor, self).await {
1144 return Err(ActorError::new(
1145 self.self_id().clone(),
1146 ActorErrorKind::Processing(err),
1147 ));
1148 }
1149 n += 1;
1150 }
1151 tracing::debug!("drained {} messages", n);
1152 }
1153 tracing::debug!("exited actor loop: {}", self.self_id());
1154 self.change_status(ActorStatus::Stopped);
1155 Ok(())
1156 }
1157
1158 async fn handle_supervision_event(
1159 &self,
1160 actor: &mut A,
1161 supervision_event: ActorSupervisionEvent,
1162 ) -> Result<(), ActorError> {
1163 match actor
1165 .handle_supervision_event(self, &supervision_event)
1166 .await
1167 {
1168 Ok(true) => {
1169 Ok(())
1171 }
1172 Ok(false) => {
1173 let supervision_event = ActorSupervisionEvent {
1175 actor_id: self.self_id().clone(),
1176 actor_status: ActorStatus::Failed(
1177 "did not handle supervision event".to_string(),
1178 ),
1179 message_headers: None,
1180 caused_by: Some(Box::new(supervision_event)),
1181 };
1182 Err(supervision_event.into())
1183 }
1184 Err(err) => {
1185 let supervision_event = ActorSupervisionEvent {
1188 actor_id: self.self_id().clone(),
1189 actor_status: ActorStatus::Failed(format!(
1190 "failed to handle supervision event: {}",
1191 err
1192 )),
1193 message_headers: None,
1194 caused_by: Some(Box::new(supervision_event)),
1195 };
1196 Err(supervision_event.into())
1197 }
1198 }
1199 }
1200
1201 async unsafe fn handle_message<M: Message>(
1202 &mut self,
1203 actor: &mut A,
1204 type_info: Option<&'static TypeInfo>,
1205 headers: Attrs,
1206 message: M,
1207 ) -> Result<(), anyhow::Error>
1208 where
1209 A: Handler<M>,
1210 {
1211 let handler = type_info.map(|info| {
1212 (
1213 info.typename().to_string(),
1214 unsafe {
1216 info.arm_unchecked(&message as *const M as *const ())
1217 .map(str::to_string)
1218 },
1219 )
1220 });
1221
1222 let _ = self.change_status(ActorStatus::Processing(
1223 self.clock().system_time_now(),
1224 handler,
1225 ));
1226 let span = self.status_span.lock().unwrap().clone();
1227
1228 let context = Context::new(self, headers);
1229 actor.handle(&context, message).instrument(span).await
1233 }
1234
1235 pub fn port<M: Message>(&self) -> PortHandle<M>
1238 where
1239 A: Handler<M>,
1240 {
1241 self.ports.get()
1242 }
1243
1244 pub fn handle(&self) -> ActorHandle<A> {
1246 ActorHandle::new(self.cell.clone(), Arc::clone(&self.ports))
1247 }
1248
1249 pub fn bind<R: Binds<A>>(&self) -> ActorRef<R> {
1251 self.cell.bind(self.ports.as_ref())
1252 }
1253
1254 #[doc(hidden)]
1256 pub fn mailbox_for_py(&self) -> &Mailbox {
1257 &self.mailbox
1258 }
1259
1260 pub fn clock(&self) -> &(impl Clock + use<A>) {
1262 &self.proc.state().clock
1263 }
1264
1265 pub fn proc(&self) -> &Proc {
1267 &self.proc
1268 }
1269}
1270
1271impl<A: Actor> Drop for Instance<A> {
1272 fn drop(&mut self) {
1273 self.status_tx.send_if_modified(|status| {
1274 if status.is_terminal() {
1275 false
1276 } else {
1277 *status = ActorStatus::Stopped;
1278 true
1279 }
1280 });
1281 }
1282}
1283
1284impl<A: Actor> cap::sealed::CanSend for Instance<A> {
1285 fn post(&self, dest: PortId, headers: Attrs, data: Serialized) {
1286 let envelope = MessageEnvelope::new(self.self_id().clone(), dest, data, headers);
1287 self.proc.post(envelope, self.ports.get());
1288 }
1289 fn actor_id(&self) -> &ActorId {
1290 self.self_id()
1291 }
1292}
1293
1294impl<A: Actor> cap::sealed::CanSend for &Instance<A> {
1295 fn post(&self, dest: PortId, headers: Attrs, data: Serialized) {
1296 (*self).post(dest, headers, data)
1297 }
1298 fn actor_id(&self) -> &ActorId {
1299 self.self_id()
1300 }
1301}
1302
1303impl<A: Actor> cap::sealed::CanOpenPort for Instance<A> {
1304 fn mailbox(&self) -> &Mailbox {
1305 &self.mailbox
1306 }
1307}
1308
1309impl<A: Actor> cap::sealed::CanOpenPort for &Instance<A> {
1310 fn mailbox(&self) -> &Mailbox {
1311 &self.mailbox
1312 }
1313}
1314
1315impl<A: Actor> cap::sealed::CanSplitPort for Instance<A> {
1316 fn split(&self, port_id: PortId, reducer_spec: Option<ReducerSpec>) -> anyhow::Result<PortId> {
1317 self.mailbox.split(port_id, reducer_spec)
1318 }
1319}
1320
1321#[async_trait]
1322impl<A: Actor> cap::sealed::CanSpawn for Instance<A> {
1323 async fn spawn<C: Actor>(&self, params: C::Params) -> anyhow::Result<ActorHandle<C>> {
1324 self.proc.spawn_child(self.cell.clone(), params).await
1325 }
1326}
1327
1328impl<A: Actor> cap::sealed::CanResolveActorRef for Instance<A> {
1329 fn resolve_actor_ref<R: RemoteActor + Actor>(
1330 &self,
1331 actor_ref: &ActorRef<R>,
1332 ) -> Option<ActorHandle<R>> {
1333 self.proc
1334 .inner
1335 .instances
1336 .get(actor_ref.actor_id())?
1337 .upgrade()?
1338 .downcast_handle()
1339 }
1340}
1341
1342impl<A: Actor> cap::sealed::CanSend for Context<'_, A> {
1343 fn post(&self, dest: PortId, headers: Attrs, data: Serialized) {
1344 <Instance<A> as cap::sealed::CanSend>::post(self, dest, headers, data)
1345 }
1346 fn actor_id(&self) -> &ActorId {
1347 self.self_id()
1348 }
1349}
1350
1351impl<A: Actor> cap::sealed::CanSend for &Context<'_, A> {
1352 fn post(&self, dest: PortId, headers: Attrs, data: Serialized) {
1353 <Instance<A> as cap::sealed::CanSend>::post(self, dest, headers, data)
1354 }
1355 fn actor_id(&self) -> &ActorId {
1356 self.self_id()
1357 }
1358}
1359
1360impl<A: Actor> cap::sealed::CanOpenPort for Context<'_, A> {
1361 fn mailbox(&self) -> &Mailbox {
1362 <Instance<A> as cap::sealed::CanOpenPort>::mailbox(self)
1363 }
1364}
1365
1366impl<A: Actor> cap::sealed::CanOpenPort for &Context<'_, A> {
1367 fn mailbox(&self) -> &Mailbox {
1368 <Instance<A> as cap::sealed::CanOpenPort>::mailbox(self)
1369 }
1370}
1371
1372impl<A: Actor> cap::sealed::CanSplitPort for Context<'_, A> {
1373 fn split(&self, port_id: PortId, reducer_spec: Option<ReducerSpec>) -> anyhow::Result<PortId> {
1374 <Instance<A> as cap::sealed::CanSplitPort>::split(self, port_id, reducer_spec)
1375 }
1376}
1377
1378#[async_trait]
1379impl<A: Actor> cap::sealed::CanSpawn for Context<'_, A> {
1380 async fn spawn<C: Actor>(&self, params: C::Params) -> anyhow::Result<ActorHandle<C>> {
1381 <Instance<A> as cap::sealed::CanSpawn>::spawn(self, params).await
1382 }
1383}
1384
1385impl<A: Actor> cap::sealed::CanResolveActorRef for Context<'_, A> {
1386 fn resolve_actor_ref<R: RemoteActor + Actor>(
1387 &self,
1388 actor_ref: &ActorRef<R>,
1389 ) -> Option<ActorHandle<R>> {
1390 <Instance<A> as cap::sealed::CanResolveActorRef>::resolve_actor_ref(self, actor_ref)
1391 }
1392}
1393
1394#[derive(Debug)]
1395enum ActorType {
1396 Named(&'static TypeInfo),
1397 Anonymous(&'static str),
1398}
1399
1400impl ActorType {
1401 fn type_name(&self) -> &'static str {
1403 match self {
1404 Self::Named(info) => info.typename(),
1405 Self::Anonymous(name) => name,
1406 }
1407 }
1408}
1409
1410#[derive(Clone, Debug)]
1416pub struct InstanceCell {
1417 inner: Arc<InstanceState>,
1418}
1419
1420#[derive(Debug)]
1421struct InstanceState {
1422 actor_id: ActorId,
1424
1425 actor_type: ActorType,
1427
1428 proc: Proc,
1430
1431 actor_loop: Option<(PortHandle<Signal>, PortHandle<ActorSupervisionEvent>)>,
1433
1434 status: watch::Receiver<ActorStatus>,
1436
1437 parent: WeakInstanceCell,
1439
1440 children: DashMap<Index, InstanceCell>,
1442
1443 actor_task_handle: OnceLock<JoinHandle<()>>,
1445
1446 exported_named_ports: DashMap<u64, &'static str>,
1448
1449 num_processed_messages: AtomicU64,
1451
1452 recording: Recording,
1455
1456 ports: Arc<dyn Any + Send + Sync>,
1459}
1460
1461impl InstanceState {
1462 fn maybe_unlink_parent(&self) -> Option<InstanceCell> {
1465 self.parent
1466 .upgrade()
1467 .filter(|parent| parent.inner.unlink(self))
1468 }
1469
1470 fn unlink(&self, child: &InstanceState) -> bool {
1472 assert_eq!(self.actor_id.proc_id(), child.actor_id.proc_id());
1473 self.children.remove(&child.actor_id.pid()).is_some()
1474 }
1475}
1476
1477impl InstanceCell {
1478 fn new(
1481 actor_id: ActorId,
1482 actor_type: ActorType,
1483 proc: Proc,
1484 actor_loop: Option<(PortHandle<Signal>, PortHandle<ActorSupervisionEvent>)>,
1485 status: watch::Receiver<ActorStatus>,
1486 parent: Option<InstanceCell>,
1487 ports: Arc<dyn Any + Send + Sync>,
1488 ) -> Self {
1489 let _ais = actor_id.to_string();
1490 let cell = Self {
1491 inner: Arc::new(InstanceState {
1492 actor_id: actor_id.clone(),
1493 actor_type,
1494 proc: proc.clone(),
1495 actor_loop,
1496 status,
1497 parent: parent.map_or_else(WeakInstanceCell::new, |cell| cell.downgrade()),
1498 children: DashMap::new(),
1499 actor_task_handle: OnceLock::new(),
1500 exported_named_ports: DashMap::new(),
1501 num_processed_messages: AtomicU64::new(0),
1502 recording: hyperactor_telemetry::recorder().record(64),
1503 ports,
1504 }),
1505 };
1506 cell.maybe_link_parent();
1507 proc.inner
1508 .instances
1509 .insert(actor_id.clone(), cell.downgrade());
1510 cell
1511 }
1512
1513 fn wrap(inner: Arc<InstanceState>) -> Self {
1514 Self { inner }
1515 }
1516
1517 pub(crate) fn actor_id(&self) -> &ActorId {
1519 &self.inner.actor_id
1520 }
1521
1522 pub(crate) fn pid(&self) -> Index {
1524 self.inner.actor_id.pid()
1525 }
1526
1527 pub(crate) fn actor_task_handle(&self) -> Option<&JoinHandle<()>> {
1529 self.inner.actor_task_handle.get()
1530 }
1531
1532 pub(crate) fn status(&self) -> &watch::Receiver<ActorStatus> {
1534 &self.inner.status
1535 }
1536
1537 #[allow(clippy::result_large_err)] pub fn signal(&self, signal: Signal) -> Result<(), ActorError> {
1540 if let Some((signal_port, _)) = &self.inner.actor_loop {
1541 signal_port.send(signal).map_err(ActorError::from)
1542 } else {
1543 tracing::warn!(
1544 "{}: attempted to send signal {} to detached actor",
1545 self.inner.actor_id,
1546 signal
1547 );
1548 Ok(())
1549 }
1550 }
1551
1552 pub fn send_supervision_event_or_crash(&self, event: ActorSupervisionEvent) {
1561 match &self.inner.actor_loop {
1562 Some((_, supervision_port)) => {
1563 if let Err(err) = supervision_port.send(event) {
1564 tracing::error!(
1565 "{}: failed to send supervision event to actor: {:?}. Crash the process.",
1566 self.actor_id(),
1567 err
1568 );
1569 std::process::exit(1);
1570 }
1571 }
1572 None => {
1573 tracing::error!(
1574 "{}: failed: {}: cannot send supervision event to detached actor: crashing",
1575 self.actor_id(),
1576 event,
1577 );
1578 std::process::exit(1);
1579 }
1580 }
1581 }
1582
1583 pub fn downgrade(&self) -> WeakInstanceCell {
1585 WeakInstanceCell {
1586 inner: Arc::downgrade(&self.inner),
1587 }
1588 }
1589
1590 fn link(&self, child: InstanceCell) {
1592 assert_eq!(self.actor_id().proc_id(), child.actor_id().proc_id());
1593 self.inner.children.insert(child.pid(), child);
1594 }
1595
1596 fn unlink(&self, child: &InstanceCell) {
1598 assert_eq!(self.actor_id().proc_id(), child.actor_id().proc_id());
1599 self.inner.children.remove(&child.pid());
1600 }
1601
1602 fn maybe_link_parent(&self) {
1604 if let Some(parent) = self.inner.parent.upgrade() {
1605 parent.link(self.clone());
1606 }
1607 }
1608
1609 fn maybe_unlink_parent(&self) -> Option<InstanceCell> {
1612 self.inner.maybe_unlink_parent()
1613 }
1614
1615 fn get_parent_cell(&self) -> Option<InstanceCell> {
1617 self.inner.parent.upgrade()
1618 }
1619
1620 fn child_iter(&self) -> impl Iterator<Item = RefMulti<'_, Index, InstanceCell>> {
1623 self.inner.children.iter()
1624 }
1625
1626 fn child_count(&self) -> usize {
1628 self.inner.children.len()
1629 }
1630
1631 fn get_child(&self, pid: Index) -> Option<InstanceCell> {
1633 self.inner.children.get(&pid).map(|child| child.clone())
1634 }
1635
1636 pub(crate) fn bind<A: Actor, R: Binds<A>>(&self, ports: &Ports<A>) -> ActorRef<R> {
1639 <R as Binds<A>>::bind(ports);
1640 ports.bind::<Signal>();
1642 ports.bind::<Undeliverable<MessageEnvelope>>();
1643 for entry in ports.bound.iter() {
1645 self.inner
1646 .exported_named_ports
1647 .insert(*entry.key(), entry.value());
1648 }
1649 ActorRef::attest(self.actor_id().clone())
1650 }
1651
1652 pub(crate) fn downcast_handle<A: Actor>(&self) -> Option<ActorHandle<A>> {
1654 let ports = Arc::clone(&self.inner.ports).downcast::<Ports<A>>().ok()?;
1655 Some(ActorHandle::new(self.clone(), ports))
1656 }
1657}
1658
1659impl Drop for InstanceState {
1660 fn drop(&mut self) {
1661 if let Some(parent) = self.maybe_unlink_parent() {
1662 tracing::debug!(
1663 "instance {} was dropped with parent {} still linked",
1664 self.actor_id,
1665 parent.actor_id()
1666 );
1667 }
1668 if self.proc.inner.instances.remove(&self.actor_id).is_none() {
1669 tracing::error!("instance {} was dropped but not in proc", self.actor_id);
1670 }
1671 }
1672}
1673
1674#[derive(Debug, Clone)]
1677pub struct WeakInstanceCell {
1678 inner: Weak<InstanceState>,
1679}
1680
1681impl WeakInstanceCell {
1682 pub fn new() -> Self {
1684 Self { inner: Weak::new() }
1685 }
1686
1687 pub fn upgrade(&self) -> Option<InstanceCell> {
1689 self.inner.upgrade().map(InstanceCell::wrap)
1690 }
1691}
1692
1693pub struct Ports<A: Actor> {
1698 ports: DashMap<TypeId, Box<dyn Any + Send + Sync + 'static>>,
1699 bound: DashMap<u64, &'static str>,
1700 mailbox: Mailbox,
1701 workq: mpsc::UnboundedSender<WorkCell<A>>,
1702}
1703
1704impl<A: Actor> Ports<A> {
1705 fn new(mailbox: Mailbox, workq: mpsc::UnboundedSender<WorkCell<A>>) -> Self {
1706 Self {
1707 ports: DashMap::new(),
1708 bound: DashMap::new(),
1709 mailbox,
1710 workq,
1711 }
1712 }
1713
1714 pub(crate) fn get<M: Message>(&self) -> PortHandle<M>
1716 where
1717 A: Handler<M>,
1718 {
1719 let key = TypeId::of::<M>();
1720 match self.ports.entry(key) {
1721 Entry::Vacant(entry) => {
1722 assert_ne!(
1724 key,
1725 TypeId::of::<Signal>(),
1726 "cannot provision Signal port through `Ports::get`"
1727 );
1728
1729 let type_info = TypeInfo::get_by_typeid(key);
1730 let workq = self.workq.clone();
1731 let actor_id = self.mailbox.actor_id().to_string();
1732 let port = self.mailbox.open_enqueue_port(move |headers, msg: M| {
1733 let work = WorkCell::new(move |actor: &mut A, instance: &mut Instance<A>| {
1734 Box::pin(async move {
1735 unsafe {
1737 instance
1738 .handle_message(actor, type_info, headers, msg)
1739 .await
1740 }
1741 })
1742 });
1743 ACTOR_MESSAGE_QUEUE_SIZE.add(
1744 1,
1745 hyperactor_telemetry::kv_pairs!("actor_id" => actor_id.clone()),
1746 );
1747 workq.send(work).map_err(anyhow::Error::from)
1748 });
1749 entry.insert(Box::new(port.clone()));
1750 port
1751 }
1752 Entry::Occupied(entry) => {
1753 let port = entry.get();
1754 port.downcast_ref::<PortHandle<M>>().unwrap().clone()
1755 }
1756 }
1757 }
1758
1759 pub(crate) fn open_message_port<M: Message>(&self) -> Option<(PortHandle<M>, PortReceiver<M>)> {
1762 match self.ports.entry(TypeId::of::<M>()) {
1763 Entry::Vacant(entry) => {
1764 let (port, receiver) = self.mailbox.open_port();
1765 entry.insert(Box::new(port.clone()));
1766 Some((port, receiver))
1767 }
1768 Entry::Occupied(_) => None,
1769 }
1770 }
1771
1772 pub fn bind<M: RemoteMessage>(&self)
1774 where
1775 A: Handler<M>,
1776 {
1777 self.bind_to::<M>(M::port());
1778 }
1779
1780 pub fn bind_to<M: RemoteMessage>(&self, port_index: u64)
1784 where
1785 A: Handler<M>,
1786 {
1787 match self.bound.entry(port_index) {
1788 Entry::Vacant(entry) => {
1789 self.get::<M>().bind_to(port_index);
1790 entry.insert(M::typename());
1791 }
1792 Entry::Occupied(entry) => {
1793 assert_eq!(
1794 *entry.get(),
1795 M::typename(),
1796 "bind {}: port index {} already bound to type {}",
1797 M::typename(),
1798 port_index,
1799 entry.get(),
1800 );
1801 }
1802 }
1803 }
1804}
1805
1806#[cfg(test)]
1807mod tests {
1808 use std::assert_matches::assert_matches;
1809 use std::sync::atomic::AtomicBool;
1810
1811 use hyperactor_macros::export;
1812 use maplit::hashmap;
1813 use serde_json::json;
1814 use tokio::sync::Barrier;
1815 use tokio::sync::oneshot;
1816 use tracing::Level;
1817 use tracing_subscriber::layer::SubscriberExt;
1818 use tracing_test::internal::logs_with_scope_contain;
1819
1820 use super::*;
1821 use crate as hyperactor;
1823 use crate::HandleClient;
1824 use crate::Handler;
1825 use crate::OncePortRef;
1826 use crate::PortRef;
1827 use crate::clock::RealClock;
1828 use crate::test_utils::proc_supervison::ProcSupervisionCoordinator;
1829 use crate::test_utils::process_assertion::assert_termination;
1830
1831 impl ActorTreeSnapshot {
1832 #[allow(dead_code)]
1833 fn empty(pid: Index) -> Self {
1834 Self {
1835 pid,
1836 type_name: String::new(),
1837 status: ActorStatus::Idle,
1838 stats: ActorStats::default(),
1839 handlers: HashMap::new(),
1840 children: HashMap::new(),
1841 events: Vec::new(),
1842 spans: Vec::new(),
1843 }
1844 }
1845
1846 fn empty_typed(pid: Index, type_name: String) -> Self {
1847 Self {
1848 pid,
1849 type_name,
1850 status: ActorStatus::Idle,
1851 stats: ActorStats::default(),
1852 handlers: HashMap::new(),
1853 children: HashMap::new(),
1854 events: Vec::new(),
1855 spans: Vec::new(),
1856 }
1857 }
1858 }
1859
1860 #[derive(Debug, Default, Actor)]
1861 #[export]
1862 struct TestActor;
1863
1864 #[derive(Handler, HandleClient, Debug)]
1865 enum TestActorMessage {
1866 Reply(oneshot::Sender<()>),
1867 Wait(oneshot::Sender<()>, oneshot::Receiver<()>),
1868 Forward(ActorHandle<TestActor>, Box<TestActorMessage>),
1869 Noop(),
1870 Fail(anyhow::Error),
1871 Panic(String),
1872 Spawn(oneshot::Sender<ActorHandle<TestActor>>),
1873 }
1874
1875 impl TestActor {
1876 async fn spawn_child(parent: &ActorHandle<TestActor>) -> ActorHandle<TestActor> {
1877 let (tx, rx) = oneshot::channel();
1878 parent.send(TestActorMessage::Spawn(tx)).unwrap();
1879 rx.await.unwrap()
1880 }
1881 }
1882
1883 #[async_trait]
1884 #[crate::forward(TestActorMessage)]
1885 impl TestActorMessageHandler for TestActor {
1886 async fn reply(
1887 &mut self,
1888 _cx: &crate::Context<Self>,
1889 sender: oneshot::Sender<()>,
1890 ) -> Result<(), anyhow::Error> {
1891 sender.send(()).unwrap();
1892 Ok(())
1893 }
1894
1895 async fn wait(
1896 &mut self,
1897 _cx: &crate::Context<Self>,
1898 sender: oneshot::Sender<()>,
1899 receiver: oneshot::Receiver<()>,
1900 ) -> Result<(), anyhow::Error> {
1901 sender.send(()).unwrap();
1902 receiver.await.unwrap();
1903 Ok(())
1904 }
1905
1906 async fn forward(
1907 &mut self,
1908 _cx: &crate::Context<Self>,
1909 destination: ActorHandle<TestActor>,
1910 message: Box<TestActorMessage>,
1911 ) -> Result<(), anyhow::Error> {
1912 destination.send(*message)?;
1914 Ok(())
1915 }
1916
1917 async fn noop(&mut self, _cx: &crate::Context<Self>) -> Result<(), anyhow::Error> {
1918 Ok(())
1919 }
1920
1921 async fn fail(
1922 &mut self,
1923 _cx: &crate::Context<Self>,
1924 err: anyhow::Error,
1925 ) -> Result<(), anyhow::Error> {
1926 Err(err)
1927 }
1928
1929 async fn panic(
1930 &mut self,
1931 _cx: &crate::Context<Self>,
1932 err_msg: String,
1933 ) -> Result<(), anyhow::Error> {
1934 panic!("{}", err_msg);
1935 }
1936
1937 async fn spawn(
1938 &mut self,
1939 cx: &crate::Context<Self>,
1940 reply: oneshot::Sender<ActorHandle<TestActor>>,
1941 ) -> Result<(), anyhow::Error> {
1942 let handle = <Self as Actor>::spawn(cx, ()).await?;
1943 reply.send(handle).unwrap();
1944 Ok(())
1945 }
1946 }
1947
1948 #[tracing_test::traced_test]
1949 #[tokio::test]
1950 async fn test_spawn_actor() {
1951 let proc = Proc::local();
1952 let handle = proc.spawn::<TestActor>("test", ()).await.unwrap();
1953
1954 assert!(logs_contain(
1956 format!(
1957 "{}: spawned with {:?}",
1958 handle.actor_id(),
1959 handle.cell().actor_task_handle().unwrap(),
1960 )
1961 .as_str()
1962 ));
1963
1964 let mut state = handle.status().clone();
1965
1966 let (tx, rx) = oneshot::channel::<()>();
1969 handle.send(TestActorMessage::Reply(tx)).unwrap();
1970 rx.await.unwrap();
1971
1972 state
1973 .wait_for(|state: &ActorStatus| matches!(*state, ActorStatus::Idle))
1974 .await
1975 .unwrap();
1976
1977 let (enter_tx, enter_rx) = oneshot::channel::<()>();
1979 let (exit_tx, exit_rx) = oneshot::channel::<()>();
1980
1981 handle
1982 .send(TestActorMessage::Wait(enter_tx, exit_rx))
1983 .unwrap();
1984 enter_rx.await.unwrap();
1985 assert_matches!(*state.borrow(), ActorStatus::Processing(instant, _) if instant <= RealClock.system_time_now());
1986 exit_tx.send(()).unwrap();
1987
1988 state
1989 .wait_for(|state| matches!(*state, ActorStatus::Idle))
1990 .await
1991 .unwrap();
1992
1993 handle.drain_and_stop().unwrap();
1994 handle.await;
1995 assert_matches!(*state.borrow(), ActorStatus::Stopped);
1996 }
1997
1998 #[tokio::test]
1999 async fn test_proc_actors_messaging() {
2000 let proc = Proc::local();
2001 let first = proc.spawn::<TestActor>("first", ()).await.unwrap();
2002 let second = proc.spawn::<TestActor>("second", ()).await.unwrap();
2003 let (tx, rx) = oneshot::channel::<()>();
2004 let reply_message = TestActorMessage::Reply(tx);
2005 first
2006 .send(TestActorMessage::Forward(second, Box::new(reply_message)))
2007 .unwrap();
2008 rx.await.unwrap();
2009 }
2010
2011 #[derive(Debug, Default, Actor)]
2012 struct LookupTestActor;
2013
2014 #[derive(Handler, HandleClient, Debug)]
2015 enum LookupTestMessage {
2016 ActorExists(ActorRef<TestActor>, #[reply] OncePortRef<bool>),
2017 }
2018
2019 #[async_trait]
2020 #[crate::forward(LookupTestMessage)]
2021 impl LookupTestMessageHandler for LookupTestActor {
2022 async fn actor_exists(
2023 &mut self,
2024 cx: &crate::Context<Self>,
2025 actor_ref: ActorRef<TestActor>,
2026 ) -> Result<bool, anyhow::Error> {
2027 Ok(actor_ref.downcast_handle(cx).is_some())
2028 }
2029 }
2030
2031 #[tokio::test]
2032 async fn test_actor_lookup() {
2033 let proc = Proc::local();
2034 let client = proc.attach("client").unwrap();
2035
2036 let target_actor = proc.spawn::<TestActor>("target", ()).await.unwrap();
2037 let target_actor_ref = target_actor.bind();
2038 let lookup_actor = proc.spawn::<LookupTestActor>("lookup", ()).await.unwrap();
2039
2040 assert!(
2041 lookup_actor
2042 .actor_exists(&client, target_actor_ref.clone())
2043 .await
2044 .unwrap()
2045 );
2046
2047 assert!(
2049 !lookup_actor
2050 .actor_exists(
2051 &client,
2052 ActorRef::attest(target_actor.actor_id().child_id(123).clone())
2053 )
2054 .await
2055 .unwrap()
2056 );
2057 assert!(
2059 !lookup_actor
2060 .actor_exists(&client, ActorRef::attest(lookup_actor.actor_id().clone()))
2061 .await
2062 .unwrap()
2063 );
2064
2065 target_actor.drain_and_stop().unwrap();
2066 target_actor.await;
2067
2068 assert!(
2069 !lookup_actor
2070 .actor_exists(&client, target_actor_ref)
2071 .await
2072 .unwrap()
2073 );
2074
2075 lookup_actor.drain_and_stop().unwrap();
2076 lookup_actor.await;
2077 }
2078
2079 fn validate_link(child: &InstanceCell, parent: &InstanceCell) {
2080 assert_eq!(child.actor_id().proc_id(), parent.actor_id().proc_id());
2081 assert_eq!(
2082 child.inner.parent.upgrade().unwrap().actor_id(),
2083 parent.actor_id()
2084 );
2085 assert_matches!(
2086 parent.inner.children.get(&child.pid()),
2087 Some(node) if node.actor_id() == child.actor_id()
2088 );
2089 }
2090
2091 #[tracing_test::traced_test]
2092 #[tokio::test]
2093 async fn test_spawn_child() {
2094 let proc = Proc::local();
2095
2096 let first = proc.spawn::<TestActor>("first", ()).await.unwrap();
2097 let second = TestActor::spawn_child(&first).await;
2098 let third = TestActor::spawn_child(&second).await;
2099
2100 assert!(logs_with_scope_contain(
2102 "hyperactor::proc",
2103 format!(
2104 "{}: spawned with {:?}",
2105 first.actor_id(),
2106 first.cell().actor_task_handle().unwrap()
2107 )
2108 .as_str()
2109 ));
2110 assert!(logs_with_scope_contain(
2111 "hyperactor::proc",
2112 format!(
2113 "{}: spawned with {:?}",
2114 second.actor_id(),
2115 second.cell().actor_task_handle().unwrap()
2116 )
2117 .as_str()
2118 ));
2119 assert!(logs_with_scope_contain(
2120 "hyperactor::proc",
2121 format!(
2122 "{}: spawned with {:?}",
2123 third.actor_id(),
2124 third.cell().actor_task_handle().unwrap()
2125 )
2126 .as_str()
2127 ));
2128
2129 assert_eq!(first.actor_id().proc_id(), proc.proc_id());
2131 assert_eq!(second.actor_id(), &first.actor_id().child_id(1));
2132 assert_eq!(third.actor_id(), &first.actor_id().child_id(2));
2133
2134 validate_link(third.cell(), second.cell());
2136 validate_link(second.cell(), first.cell());
2137 assert!(first.cell().inner.parent.upgrade().is_none());
2138
2139 third.drain_and_stop().unwrap();
2141 third.await;
2142 assert!(second.cell().inner.children.is_empty());
2143 validate_link(second.cell(), first.cell());
2144
2145 second.drain_and_stop().unwrap();
2146 second.await;
2147 assert!(first.cell().inner.children.is_empty());
2148 }
2149
2150 #[tokio::test]
2151 async fn test_child_lifecycle() {
2152 let proc = Proc::local();
2153
2154 let root = proc.spawn::<TestActor>("root", ()).await.unwrap();
2155 let root_1 = TestActor::spawn_child(&root).await;
2156 let root_2 = TestActor::spawn_child(&root).await;
2157 let root_2_1 = TestActor::spawn_child(&root_2).await;
2158
2159 root.drain_and_stop().unwrap();
2160 root.await;
2161
2162 for actor in [root_1, root_2, root_2_1] {
2163 assert!(actor.send(TestActorMessage::Noop()).is_err());
2164 assert_matches!(actor.await, ActorStatus::Stopped);
2165 }
2166 }
2167
2168 #[tokio::test]
2169 async fn test_parent_failure() {
2170 let proc = Proc::local();
2171 ProcSupervisionCoordinator::set(&proc).await.unwrap();
2174
2175 let root = proc.spawn::<TestActor>("root", ()).await.unwrap();
2176 let root_1 = TestActor::spawn_child(&root).await;
2177 let root_2 = TestActor::spawn_child(&root).await;
2178 let root_2_1 = TestActor::spawn_child(&root_2).await;
2179
2180 root_2
2181 .send(TestActorMessage::Fail(anyhow::anyhow!(
2182 "some random failure"
2183 )))
2184 .unwrap();
2185 let root_2_actor_id = root_2.actor_id().clone();
2186 assert_matches!(
2187 root_2.await,
2188 ActorStatus::Failed(err) if err == format!("serving {}: processing error: some random failure", root_2_actor_id)
2189 );
2190
2191 assert_eq!(
2194 root.await,
2195 ActorStatus::Failed("did not handle supervision event".to_string())
2196 );
2197 assert_eq!(root_2_1.await, ActorStatus::Stopped);
2198 assert_eq!(root_1.await, ActorStatus::Stopped);
2199 }
2200
2201 #[tokio::test]
2202 async fn test_actor_ledger() {
2203 async fn wait_until_idle(actor_handle: &ActorHandle<TestActor>) {
2204 actor_handle
2205 .status()
2206 .wait_for(|state: &ActorStatus| matches!(*state, ActorStatus::Idle))
2207 .await
2208 .unwrap();
2209 }
2210
2211 let proc = Proc::local();
2212
2213 let root: ActorHandle<TestActor> = proc.spawn::<TestActor>("root", ()).await.unwrap();
2215 wait_until_idle(&root).await;
2216 {
2217 let snapshot = proc.state().ledger.snapshot();
2218 assert_eq!(
2219 snapshot.roots,
2220 hashmap! {
2221 root.actor_id().clone() =>
2222 ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string())
2223 },
2224 );
2225 }
2226
2227 let another_root: ActorHandle<TestActor> =
2229 proc.spawn::<TestActor>("another_root", ()).await.unwrap();
2230 wait_until_idle(&another_root).await;
2231 {
2232 let snapshot = proc.state().ledger.snapshot();
2233 assert_eq!(
2234 snapshot.roots,
2235 hashmap! {
2236 root.actor_id().clone() =>
2237 ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string()),
2238 another_root.actor_id().clone() =>
2239 ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string()),
2240 },
2241 );
2242 }
2243
2244 another_root.drain_and_stop().unwrap();
2247 another_root.await;
2248 {
2249 let snapshot = proc.state().ledger.snapshot();
2250 assert_eq!(
2251 snapshot.roots,
2252 hashmap! { root.actor_id().clone() =>
2253 ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string())
2254 },
2255 );
2256 }
2257
2258 let root_1 = TestActor::spawn_child(&root).await;
2264 wait_until_idle(&root_1).await;
2265 {
2266 let snapshot = proc.state().ledger.snapshot();
2267 assert_eq!(
2268 snapshot.roots,
2269 hashmap! {
2270 root.actor_id().clone() => ActorTreeSnapshot {
2271 pid: 0,
2272 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2273 status: ActorStatus::Idle,
2274 stats: ActorStats { num_processed_messages: 1 },
2275 handlers: HashMap::new(),
2276 children: hashmap! {
2277 root_1.actor_id().pid() =>
2278 ActorTreeSnapshot::empty_typed(
2279 root_1.actor_id().pid(),
2280 "hyperactor::proc::tests::TestActor".to_string()
2281 )
2282 },
2283 events: Vec::new(),
2284 spans: Vec::new(),
2285 }
2286 },
2287 );
2288 }
2289
2290 let root_1_1 = TestActor::spawn_child(&root_1).await;
2291 wait_until_idle(&root_1_1).await;
2292 {
2293 let snapshot = proc.state().ledger.snapshot();
2294 assert_eq!(
2295 snapshot.roots,
2296 hashmap! {
2297 root.actor_id().clone() => ActorTreeSnapshot {
2298 pid: 0,
2299 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2300 status: ActorStatus::Idle,
2301 stats: ActorStats { num_processed_messages: 1 },
2302 handlers: HashMap::new(),
2303 children: hashmap!{
2304 root_1.actor_id().pid() =>
2305 ActorTreeSnapshot {
2306 pid: root_1.actor_id().pid(),
2307 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2308 status: ActorStatus::Idle,
2309 stats: ActorStats { num_processed_messages: 1 },
2310 handlers: HashMap::new(),
2311 children: hashmap!{
2312 root_1_1.actor_id().pid() =>
2313 ActorTreeSnapshot::empty_typed(
2314 root_1_1.actor_id().pid(),
2315 "hyperactor::proc::tests::TestActor".to_string()
2316 )
2317 },
2318 events: Vec::new(),
2319 spans: Vec::new(),
2320 }
2321 },
2322 events: Vec::new(),
2323 spans: Vec::new(),
2324 },
2325 }
2326 );
2327 }
2328
2329 let root_2 = TestActor::spawn_child(&root).await;
2330 wait_until_idle(&root_2).await;
2331 {
2332 let snapshot = proc.state().ledger.snapshot();
2333 assert_eq!(
2334 snapshot.roots,
2335 hashmap! {
2336 root.actor_id().clone() => ActorTreeSnapshot {
2337 pid: 0,
2338 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2339 status: ActorStatus::Idle,
2340 stats: ActorStats { num_processed_messages: 2 },
2341 handlers: HashMap::new(),
2342 children: hashmap!{
2343 root_2.actor_id().pid() =>
2344 ActorTreeSnapshot{
2345 pid: root_2.actor_id().pid(),
2346 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2347 status: ActorStatus::Idle,
2348 stats: ActorStats::default(),
2349 handlers: HashMap::new(),
2350 children: HashMap::new(),
2351 events: Vec::new(),
2352 spans: Vec::new(),
2353 },
2354 root_1.actor_id().pid() =>
2355 ActorTreeSnapshot{
2356 pid: root_1.actor_id().pid(),
2357 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2358 status: ActorStatus::Idle,
2359 stats: ActorStats { num_processed_messages: 1 },
2360 handlers: HashMap::new(),
2361 children: hashmap!{
2362 root_1_1.actor_id().pid() =>
2363 ActorTreeSnapshot::empty_typed(
2364 root_1_1.actor_id().pid(),
2365 "hyperactor::proc::tests::TestActor".to_string()
2366 )
2367 },
2368 events: Vec::new(),
2369 spans: Vec::new(),
2370 },
2371 },
2372 events: Vec::new(),
2373 spans: Vec::new(),
2374 },
2375 }
2376 );
2377 }
2378
2379 root_1.drain_and_stop().unwrap();
2381 root_1.await;
2382 {
2383 let snapshot = proc.state().ledger.snapshot();
2384 assert_eq!(
2385 snapshot.roots,
2386 hashmap! {
2387 root.actor_id().clone() => ActorTreeSnapshot {
2388 pid: 0,
2389 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2390 status: ActorStatus::Idle,
2391 stats: ActorStats { num_processed_messages: 3 },
2392 handlers: HashMap::new(),
2393 children: hashmap!{
2394 root_2.actor_id().pid() =>
2395 ActorTreeSnapshot {
2396 pid: root_2.actor_id().pid(),
2397 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2398 status: ActorStatus::Idle,
2399 stats: ActorStats::default(),
2400 handlers: HashMap::new(),
2401 children: HashMap::new(),
2402 events: Vec::new(),
2403 spans: Vec::new(),
2404 }
2405 },
2406 events: Vec::new(),
2407 spans: Vec::new(),
2408 },
2409 }
2410 );
2411 }
2412
2413 root.drain_and_stop().unwrap();
2415 root.await;
2416 {
2417 let snapshot = proc.state().ledger.snapshot();
2418 assert_eq!(snapshot.roots, hashmap! {});
2419 }
2420 }
2421
2422 #[tokio::test]
2423 async fn test_multi_handler() {
2424 #[derive(Debug)]
2428 struct TestActor(Arc<AtomicUsize>);
2429
2430 #[async_trait]
2431 impl Actor for TestActor {
2432 type Params = Arc<AtomicUsize>;
2433
2434 async fn new(param: Arc<AtomicUsize>) -> Result<Self, anyhow::Error> {
2435 Ok(Self(param))
2436 }
2437 }
2438
2439 #[async_trait]
2440 impl Handler<OncePortHandle<PortHandle<usize>>> for TestActor {
2441 async fn handle(
2442 &mut self,
2443 cx: &crate::Context<Self>,
2444 message: OncePortHandle<PortHandle<usize>>,
2445 ) -> anyhow::Result<()> {
2446 message.send(cx.port())?;
2447 Ok(())
2448 }
2449 }
2450
2451 #[async_trait]
2452 impl Handler<usize> for TestActor {
2453 async fn handle(
2454 &mut self,
2455 _cx: &crate::Context<Self>,
2456 message: usize,
2457 ) -> anyhow::Result<()> {
2458 self.0.fetch_add(message, Ordering::SeqCst);
2459 Ok(())
2460 }
2461 }
2462
2463 let proc = Proc::local();
2464 let state = Arc::new(AtomicUsize::new(0));
2465 let handle = proc
2466 .spawn::<TestActor>("test", state.clone())
2467 .await
2468 .unwrap();
2469 let client = proc.attach("client").unwrap();
2470 let (tx, rx) = client.open_once_port();
2471 handle.send(tx).unwrap();
2472 let usize_handle = rx.recv().await.unwrap();
2473 usize_handle.send(123).unwrap();
2474
2475 handle.drain_and_stop().unwrap();
2476 handle.await;
2477
2478 assert_eq!(state.load(Ordering::SeqCst), 123);
2479 }
2480
2481 #[tokio::test]
2482 async fn test_actor_panic() {
2483 panic_handler::set_panic_hook();
2485
2486 let proc = Proc::local();
2487 ProcSupervisionCoordinator::set(&proc).await.unwrap();
2490
2491 let client = proc.attach("client").unwrap();
2492 let actor_handle = proc.spawn::<TestActor>("test", ()).await.unwrap();
2493 actor_handle
2494 .panic(&client, "some random failure".to_string())
2495 .await
2496 .unwrap();
2497 let actor_status = actor_handle.await;
2498
2499 assert_matches!(actor_status, ActorStatus::Failed(_));
2503 if let ActorStatus::Failed(err) = actor_status {
2504 let error_msg = err.to_string();
2505 assert!(error_msg.contains("some random failure"));
2507 assert!(error_msg.contains("rust_begin_unwind"));
2511 }
2512 }
2513
2514 #[tokio::test]
2515 async fn test_local_supervision_propagation() {
2516 #[derive(Debug)]
2517 struct TestActor(Arc<AtomicBool>, bool);
2518
2519 #[async_trait]
2520 impl Actor for TestActor {
2521 type Params = (Arc<AtomicBool>, bool);
2522
2523 async fn new(param: (Arc<AtomicBool>, bool)) -> Result<Self, anyhow::Error> {
2524 Ok(Self(param.0, param.1))
2525 }
2526
2527 async fn handle_supervision_event(
2528 &mut self,
2529 _this: &Instance<Self>,
2530 _event: &ActorSupervisionEvent,
2531 ) -> Result<bool, anyhow::Error> {
2532 if !self.1 {
2533 return Ok(false);
2534 }
2535
2536 tracing::error!(
2537 "{}: supervision event received: {:?}",
2538 _this.self_id(),
2539 _event
2540 );
2541 self.0.store(true, Ordering::SeqCst);
2542 Ok(true)
2543 }
2544 }
2545
2546 #[async_trait]
2547 impl Handler<String> for TestActor {
2548 async fn handle(
2549 &mut self,
2550 cx: &crate::Context<Self>,
2551 message: String,
2552 ) -> anyhow::Result<()> {
2553 tracing::info!("{} received message: {}", cx.self_id(), message);
2554 Err(anyhow::anyhow!(message))
2555 }
2556 }
2557
2558 let proc = Proc::local();
2559 let reported_event = ProcSupervisionCoordinator::set(&proc).await.unwrap();
2560
2561 let root_state = Arc::new(AtomicBool::new(false));
2562 let root_1_state = Arc::new(AtomicBool::new(false));
2563 let root_1_1_state = Arc::new(AtomicBool::new(false));
2564 let root_1_1_1_state = Arc::new(AtomicBool::new(false));
2565 let root_2_state = Arc::new(AtomicBool::new(false));
2566 let root_2_1_state = Arc::new(AtomicBool::new(false));
2567
2568 let root = proc
2569 .spawn::<TestActor>("root", (root_state.clone(), false))
2570 .await
2571 .unwrap();
2572 let root_1 = proc
2573 .spawn_child::<TestActor>(
2574 root.cell().clone(),
2575 (
2576 root_1_state.clone(),
2577 true, ),
2579 )
2580 .await
2581 .unwrap();
2582 let root_1_1 = proc
2583 .spawn_child::<TestActor>(root_1.cell().clone(), (root_1_1_state.clone(), false))
2584 .await
2585 .unwrap();
2586 let root_1_1_1 = proc
2587 .spawn_child::<TestActor>(root_1_1.cell().clone(), (root_1_1_1_state.clone(), false))
2588 .await
2589 .unwrap();
2590 let root_2 = proc
2591 .spawn_child::<TestActor>(root.cell().clone(), (root_2_state.clone(), false))
2592 .await
2593 .unwrap();
2594 let root_2_1 = proc
2595 .spawn_child::<TestActor>(root_2.cell().clone(), (root_2_1_state.clone(), false))
2596 .await
2597 .unwrap();
2598
2599 root_1_1_1
2602 .send::<String>("some random failure".into())
2603 .unwrap();
2604
2605 root_2_1
2608 .send::<String>("some random failure".into())
2609 .unwrap();
2610
2611 RealClock.sleep(Duration::from_secs(1)).await;
2612
2613 assert!(!root_state.load(Ordering::SeqCst));
2614 assert!(root_1_state.load(Ordering::SeqCst));
2615 assert!(!root_1_1_state.load(Ordering::SeqCst));
2616 assert!(!root_1_1_1_state.load(Ordering::SeqCst));
2617 assert!(!root_2_state.load(Ordering::SeqCst));
2618 assert!(!root_2_1_state.load(Ordering::SeqCst));
2619 assert_eq!(
2620 reported_event.event().map(|e| e.actor_id.clone()),
2621 Some(root.actor_id().clone())
2622 );
2623 }
2624
2625 #[tokio::test]
2626 async fn test_supervision_event_handler_propagates() {
2627 #[derive(Debug)]
2628 struct FailingSupervisionActor;
2629
2630 #[async_trait]
2631 impl Actor for FailingSupervisionActor {
2632 type Params = ();
2633
2634 async fn new(_: ()) -> Result<Self, anyhow::Error> {
2635 Ok(Self)
2636 }
2637
2638 async fn handle_supervision_event(
2639 &mut self,
2640 _this: &Instance<Self>,
2641 _event: &ActorSupervisionEvent,
2642 ) -> Result<bool, anyhow::Error> {
2643 anyhow::bail!("failed to handle supervision event!")
2644 }
2645 }
2646
2647 #[async_trait]
2648 impl Handler<String> for FailingSupervisionActor {
2649 async fn handle(
2650 &mut self,
2651 _cx: &crate::Context<Self>,
2652 message: String,
2653 ) -> anyhow::Result<()> {
2654 Err(anyhow::anyhow!(message))
2655 }
2656 }
2657
2658 #[derive(Debug)]
2659 struct ParentActor(tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>);
2660
2661 #[async_trait]
2662 impl Actor for ParentActor {
2663 type Params = tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>;
2664
2665 async fn new(
2666 supervision_events: tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>,
2667 ) -> Result<Self, anyhow::Error> {
2668 Ok(Self(supervision_events))
2669 }
2670
2671 async fn handle_supervision_event(
2672 &mut self,
2673 _this: &Instance<Self>,
2674 event: &ActorSupervisionEvent,
2675 ) -> Result<bool, anyhow::Error> {
2676 self.0.send(event.clone()).unwrap();
2677 Ok(true)
2678 }
2679 }
2680
2681 let proc = Proc::local();
2682
2683 let (event_tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel();
2684
2685 let parent = proc.spawn::<ParentActor>("parent", event_tx).await.unwrap();
2686 let child = proc
2687 .spawn_child::<FailingSupervisionActor>(parent.cell().clone(), ())
2688 .await
2689 .unwrap();
2690 let grandchild = proc
2691 .spawn_child::<FailingSupervisionActor>(child.cell().clone(), ())
2692 .await
2693 .unwrap();
2694
2695 let child_actor_id = child.actor_id().clone();
2696 let grandchild_actor_id = grandchild.actor_id().clone();
2697
2698 grandchild.send("trigger failure".to_string()).unwrap();
2701
2702 assert!(grandchild.await.is_failed());
2703 assert!(child.await.is_failed());
2704
2705 assert_eq!(
2706 event_rx.recv().await.unwrap(),
2707 ActorSupervisionEvent {
2708 actor_id: child_actor_id,
2709 actor_status: ActorStatus::Failed(
2710 "failed to handle supervision event: failed to handle supervision event!"
2711 .to_string()
2712 ),
2713 message_headers: None,
2714 caused_by: Some(Box::new(ActorSupervisionEvent {
2715 actor_id: grandchild_actor_id,
2716 actor_status: ActorStatus::Failed(
2717 "serving local[0].parent[2]: processing error: trigger failure".to_string()
2718 ),
2719 message_headers: None,
2720 caused_by: None,
2721 })),
2722 }
2723 );
2724
2725 assert!(event_rx.try_recv().is_err());
2726 }
2727
2728 #[tokio::test]
2729 async fn test_instance() {
2730 #[derive(Debug, Default, Actor)]
2731 struct TestActor;
2732
2733 #[async_trait]
2734 impl Handler<(String, PortRef<String>)> for TestActor {
2735 async fn handle(
2736 &mut self,
2737 cx: &crate::Context<Self>,
2738 (message, port): (String, PortRef<String>),
2739 ) -> anyhow::Result<()> {
2740 port.send(cx, message)?;
2741 Ok(())
2742 }
2743 }
2744
2745 let proc = Proc::local();
2746
2747 let (instance, handle) = proc.instance("my_test_actor").unwrap();
2748
2749 let child_actor = TestActor::spawn(&instance, ()).await.unwrap();
2750
2751 let (port, mut receiver) = instance.open_port();
2752 child_actor
2753 .send(("hello".to_string(), port.bind()))
2754 .unwrap();
2755
2756 let message = receiver.recv().await.unwrap();
2757 assert_eq!(message, "hello");
2758
2759 child_actor.drain_and_stop().unwrap();
2760 child_actor.await;
2761
2762 assert_eq!(*handle.status().borrow(), ActorStatus::Client);
2763 drop(instance);
2764 assert_eq!(*handle.status().borrow(), ActorStatus::Stopped);
2765 handle.await;
2766 }
2767
2768 #[tokio::test]
2769 async fn test_proc_terminate_without_coordinator() {
2770 if std::env::var("CARGO_TEST").is_ok() {
2771 eprintln!("test skipped as it hangs when run by cargo in sandcastle");
2772 return;
2773 }
2774
2775 let process = async {
2776 let proc = Proc::local();
2777 let root = proc.spawn::<TestActor>("root", ()).await.unwrap();
2781 let client = proc.attach("client").unwrap();
2782 root.fail(&client, anyhow::anyhow!("some random failure"))
2783 .await
2784 .unwrap();
2785 RealClock.sleep(Duration::from_secs(30)).await;
2789 };
2790
2791 assert_termination(|| process, 1).await.unwrap();
2792 }
2793
2794 fn trace_and_block(fut: impl Future) {
2795 tracing::subscriber::with_default(
2796 tracing_subscriber::Registry::default().with(hyperactor_telemetry::recorder().layer()),
2797 || {
2798 tokio::runtime::Builder::new_current_thread()
2799 .enable_all()
2800 .build()
2801 .unwrap()
2802 .block_on(fut)
2803 },
2804 );
2805 }
2806
2807 #[ignore = "until trace recording is turned back on"]
2808 #[test]
2809 fn test_handler_logging() {
2810 #[derive(Debug, Default, Actor)]
2811 struct LoggingActor;
2812
2813 impl LoggingActor {
2814 async fn wait(handle: &ActorHandle<Self>) {
2815 let barrier = Arc::new(Barrier::new(2));
2816 handle.send(barrier.clone()).unwrap();
2817 barrier.wait().await;
2818 }
2819 }
2820
2821 #[async_trait]
2822 impl Handler<String> for LoggingActor {
2823 async fn handle(
2824 &mut self,
2825 _cx: &crate::Context<Self>,
2826 message: String,
2827 ) -> anyhow::Result<()> {
2828 tracing::info!("{}", message);
2829 Ok(())
2830 }
2831 }
2832
2833 #[async_trait]
2834 impl Handler<u64> for LoggingActor {
2835 async fn handle(
2836 &mut self,
2837 _cx: &crate::Context<Self>,
2838 message: u64,
2839 ) -> anyhow::Result<()> {
2840 tracing::event!(Level::INFO, number = message);
2841 Ok(())
2842 }
2843 }
2844
2845 #[async_trait]
2846 impl Handler<Arc<Barrier>> for LoggingActor {
2847 async fn handle(
2848 &mut self,
2849 _cx: &crate::Context<Self>,
2850 message: Arc<Barrier>,
2851 ) -> anyhow::Result<()> {
2852 message.wait().await;
2853 Ok(())
2854 }
2855 }
2856
2857 #[async_trait]
2858 impl Handler<Arc<(Barrier, Barrier)>> for LoggingActor {
2859 async fn handle(
2860 &mut self,
2861 _cx: &crate::Context<Self>,
2862 barriers: Arc<(Barrier, Barrier)>,
2863 ) -> anyhow::Result<()> {
2864 let inner = tracing::span!(Level::INFO, "child_span");
2865 let _inner_guard = inner.enter();
2866 barriers.0.wait().await;
2867 barriers.1.wait().await;
2868 Ok(())
2869 }
2870 }
2871
2872 trace_and_block(async {
2873 let handle = LoggingActor::spawn_detached(()).await.unwrap();
2874 handle.send("hello world".to_string()).unwrap();
2875 handle.send("hello world again".to_string()).unwrap();
2876 handle.send(123u64).unwrap();
2877
2878 LoggingActor::wait(&handle).await;
2879
2880 let events = handle.cell().inner.recording.tail();
2881 assert_eq!(events.len(), 3);
2882 assert_eq!(events[0].json_value(), json!({ "message": "hello world" }));
2883 assert_eq!(
2884 events[1].json_value(),
2885 json!({ "message": "hello world again" })
2886 );
2887 assert_eq!(events[2].json_value(), json!({ "number": 123 }));
2888
2889 let stacks = {
2890 let barriers = Arc::new((Barrier::new(2), Barrier::new(2)));
2891 handle.send(Arc::clone(&barriers)).unwrap();
2892 barriers.0.wait().await;
2893 let stacks = handle.cell().inner.recording.stacks();
2894 barriers.1.wait().await;
2895 stacks
2896 };
2897 assert_eq!(stacks.len(), 1);
2898 assert_eq!(stacks[0].len(), 1);
2899 assert_eq!(stacks[0][0].name(), "child_span");
2900 })
2901 }
2902}