1use std::any::Any;
39use std::any::TypeId;
40use std::collections::HashMap;
41use std::fmt;
42use std::future::Future;
43use std::ops::Deref;
44use std::panic;
45use std::panic::AssertUnwindSafe;
46use std::panic::Location;
47use std::pin::Pin;
48use std::sync::Arc;
49use std::sync::OnceLock;
50use std::sync::RwLock;
51use std::sync::Weak;
52use std::sync::atomic::AtomicBool;
53use std::sync::atomic::AtomicU64;
54use std::sync::atomic::AtomicUsize;
55use std::sync::atomic::Ordering;
56use std::time::Duration;
57use std::time::Instant;
58use std::time::SystemTime;
59
60use async_trait::async_trait;
61use dashmap::DashMap;
62use dashmap::mapref::entry::Entry;
63use dashmap::mapref::multiple::RefMulti;
64use futures::FutureExt;
65use hyperactor_config::Flattrs;
66use hyperactor_telemetry::ActorStatusEvent;
67use hyperactor_telemetry::generate_actor_status_event_id;
68use hyperactor_telemetry::hash_to_u64;
69use hyperactor_telemetry::notify_actor_status_changed;
70use hyperactor_telemetry::notify_message;
71use hyperactor_telemetry::notify_message_status;
72use hyperactor_telemetry::recorder::Recording;
73use tokio::sync::mpsc;
74use tokio::sync::watch;
75use tokio::task::JoinHandle;
76use tracing::Instrument;
77use tracing::Span;
78use typeuri::Named;
79use uuid::Uuid;
80use wirevalue::TypeInfo;
81
82use crate as hyperactor;
83use crate::Actor;
84use crate::Handler;
85use crate::Message;
86use crate::RemoteMessage;
87use crate::actor::ActorError;
88use crate::actor::ActorErrorKind;
89use crate::actor::ActorHandle;
90use crate::actor::ActorStatus;
91use crate::actor::Binds;
92use crate::actor::HandlerInfo;
93use crate::actor::Referable;
94use crate::actor::RemoteHandles;
95use crate::actor::Signal;
96use crate::actor_local::ActorLocalStorage;
97use crate::channel;
98use crate::channel::ChannelAddr;
99use crate::channel::ChannelError;
100use crate::channel::ChannelTransport;
101use crate::config;
102use crate::context;
103use crate::context::Mailbox as _;
104use crate::introspect::IntrospectMessage;
105use crate::introspect::IntrospectResult;
106use crate::mailbox::BoxedMailboxSender;
107use crate::mailbox::DeliveryError;
108use crate::mailbox::DialMailboxRouter;
109use crate::mailbox::IntoBoxedMailboxSender as _;
110use crate::mailbox::Mailbox;
111use crate::mailbox::MailboxMuxer;
112use crate::mailbox::MailboxSender;
113use crate::mailbox::MailboxServer as _;
114use crate::mailbox::MessageEnvelope;
115use crate::mailbox::OncePortHandle;
116use crate::mailbox::OncePortReceiver;
117use crate::mailbox::PanickingMailboxSender;
118use crate::mailbox::PortHandle;
119use crate::mailbox::PortReceiver;
120use crate::mailbox::Undeliverable;
121use crate::metrics::ACTOR_MESSAGE_HANDLER_DURATION;
122use crate::metrics::ACTOR_MESSAGE_QUEUE_SIZE;
123use crate::metrics::ACTOR_MESSAGES_RECEIVED;
124use crate::ordering::OrderedSender;
125use crate::ordering::OrderedSenderError;
126use crate::ordering::SEQ_INFO;
127use crate::ordering::SeqInfo;
128use crate::ordering::Sequencer;
129use crate::ordering::ordered_channel;
130use crate::panic_handler;
131use crate::reference;
132use crate::supervision::ActorSupervisionEvent;
133
134static NEXT_LOCAL_RANK: AtomicUsize = AtomicUsize::new(0);
136
137#[derive(Clone)]
144pub struct Proc {
145 inner: Arc<ProcState>,
146}
147
148impl fmt::Debug for Proc {
149 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
150 f.debug_struct("Proc")
151 .field("proc_id", &self.inner.proc_id)
152 .finish()
153 }
154}
155
156struct ProcState {
157 proc_id: reference::ProcId,
160
161 proc_muxer: MailboxMuxer,
164
165 forwarder: BoxedMailboxSender,
167
168 roots: DashMap<String, AtomicUsize>,
173
174 instances: DashMap<reference::ActorId, WeakInstanceCell>,
176
177 terminated_snapshots: DashMap<reference::ActorId, crate::introspect::IntrospectResult>,
182
183 supervision_coordinator_port: OnceLock<PortHandle<ActorSupervisionEvent>>,
186}
187
188impl Drop for ProcState {
189 fn drop(&mut self) {
190 tracing::info!(
194 proc_id = %self.proc_id,
195 name = "ProcStatus",
196 status = "Dropped"
197 );
198 }
199}
200
201pub struct ActorInstance<A: Actor> {
206 pub instance: Instance<A>,
208 pub handle: ActorHandle<A>,
210 pub supervision: PortReceiver<ActorSupervisionEvent>,
212 pub signal: PortReceiver<Signal>,
214 pub work: mpsc::UnboundedReceiver<WorkCell<A>>,
216}
217
218impl Proc {
219 pub fn configured(proc_id: reference::ProcId, forwarder: BoxedMailboxSender) -> Self {
221 tracing::info!(
222 proc_id = %proc_id,
223 name = "ProcStatus",
224 status = "Created"
225 );
226
227 Self {
228 inner: Arc::new(ProcState {
229 proc_id,
230 proc_muxer: MailboxMuxer::new(),
231 forwarder,
232 roots: DashMap::new(),
233 instances: DashMap::new(),
234 terminated_snapshots: DashMap::new(),
235 supervision_coordinator_port: OnceLock::new(),
236 }),
237 }
238 }
239
240 pub fn direct(addr: ChannelAddr, name: String) -> Result<Self, ChannelError> {
242 let (addr, rx) = channel::serve(addr)?;
243 let proc_id = reference::ProcId::with_name(addr, name);
244 let proc = Self::configured(proc_id, DialMailboxRouter::new().into_boxed());
245 proc.clone().serve(rx);
246 Ok(proc)
247 }
248
249 pub fn set_supervision_coordinator(
252 &self,
253 port: PortHandle<ActorSupervisionEvent>,
254 ) -> Result<(), anyhow::Error> {
255 self.state()
256 .supervision_coordinator_port
257 .set(port)
258 .map_err(|existing| anyhow::anyhow!("coordinator port is already set to {existing}"))
259 }
260
261 pub fn handle_unhandled_supervision_event(
264 &self,
265 cx: &impl context::Actor,
266 event: ActorSupervisionEvent,
267 ) {
268 let result = match self.state().supervision_coordinator_port.get() {
269 Some(port) => port.send(cx, event.clone()).map_err(anyhow::Error::from),
270 None => Err(anyhow::anyhow!(
271 "coordinator port is not set for proc {}",
272 self.proc_id(),
273 )),
274 };
275 if let Err(err) = result {
276 tracing::error!(
277 "proc {}: could not propagate supervision event {} due to error: {:?}: crashing",
278 self.proc_id(),
279 event,
280 err
281 );
282
283 std::process::exit(1);
284 }
285 }
286
287 pub fn local() -> Self {
290 let rank = NEXT_LOCAL_RANK.fetch_add(1, Ordering::Relaxed);
291 let addr = ChannelAddr::any(ChannelTransport::Local);
292 let proc_id = reference::ProcId::unique(addr, format!("local_{}", rank));
293 Proc::configured(proc_id, BoxedMailboxSender::new(PanickingMailboxSender))
294 }
295
296 pub fn proc_id(&self) -> &reference::ProcId {
298 &self.state().proc_id
299 }
300
301 pub fn forwarder(&self) -> &BoxedMailboxSender {
304 &self.inner.forwarder
305 }
306
307 fn state(&self) -> &ProcState {
309 self.inner.as_ref()
310 }
311
312 pub(crate) fn runtime() -> &'static Proc {
314 static RUNTIME_PROC: OnceLock<Proc> = OnceLock::new();
315 RUNTIME_PROC.get_or_init(|| {
316 let addr = ChannelAddr::any(ChannelTransport::Local);
317 let proc_id = reference::ProcId::unique(addr, "hyperactor_runtime");
318 Proc::configured(proc_id, BoxedMailboxSender::new(PanickingMailboxSender))
319 })
320 }
321
322 pub fn attach(&self, name: &str) -> Result<Mailbox, anyhow::Error> {
324 let actor_id: reference::ActorId = self.allocate_root_id(name)?;
325 Ok(self.bind_mailbox(actor_id))
326 }
327
328 pub fn attach_child(&self, parent_id: &reference::ActorId) -> Result<Mailbox, anyhow::Error> {
330 let actor_id: reference::ActorId = self.allocate_child_id(parent_id)?;
331 Ok(self.bind_mailbox(actor_id))
332 }
333
334 fn bind_mailbox(&self, actor_id: reference::ActorId) -> Mailbox {
336 let mbox = Mailbox::new(actor_id, BoxedMailboxSender::new(self.downgrade()));
337
338 self.state().proc_muxer.bind_mailbox(mbox.clone());
341 mbox
342 }
343
344 pub fn attach_actor<R, M>(
347 &self,
348 name: &str,
349 ) -> Result<(Instance<()>, reference::ActorRef<R>, PortReceiver<M>), anyhow::Error>
350 where
351 M: RemoteMessage,
352 R: Referable + RemoteHandles<M>,
353 {
354 let (instance, _handle) = self.instance(name)?;
355 let (_handle, rx) = instance.bind_actor_port::<M>();
356 let actor_ref = reference::ActorRef::attest(instance.self_id().clone());
357 Ok((instance, actor_ref, rx))
358 }
359
360 pub fn spawn<A: Actor>(&self, name: &str, actor: A) -> Result<ActorHandle<A>, anyhow::Error> {
363 let actor_id = self.allocate_root_id(name)?;
364 self.spawn_inner(actor_id, actor, None)
365 }
366
367 #[hyperactor::instrument(fields(actor_id = actor_id.to_string(), actor_name = actor_id.name(), actor_type = std::any::type_name::<A>()))]
370 fn spawn_inner<A: Actor>(
371 &self,
372 actor_id: reference::ActorId,
373 actor: A,
374 parent: Option<InstanceCell>,
375 ) -> Result<ActorHandle<A>, anyhow::Error> {
376 let (instance, receivers) = Instance::new(self.clone(), actor_id, false, parent);
377 Ok(instance.start(actor, receivers))
378 }
379
380 pub fn instance(&self, name: &str) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
385 let actor_id = self.allocate_root_id(name)?;
386 let (instance, _receivers) = Instance::new(self.clone(), actor_id, false, None);
387 let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
388 instance.change_status(ActorStatus::Client);
389 Ok((instance, handle))
390 }
391
392 pub fn introspectable_instance(
405 &self,
406 name: &str,
407 ) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
408 let actor_id = self.allocate_root_id(name)?;
409 let (instance, receivers) = Instance::new(self.clone(), actor_id, false, None);
410 let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
411 instance.change_status(ActorStatus::Client);
412 tokio::spawn(crate::introspect::serve_introspect(
413 instance.inner.cell.clone(),
414 instance.inner.mailbox.clone(),
415 receivers.introspect,
416 ));
417 Ok((instance, handle))
418 }
419
420 pub fn actor_instance<A: Actor>(&self, name: &str) -> Result<ActorInstance<A>, anyhow::Error> {
426 let actor_id = self.allocate_root_id(name)?;
427 let span = tracing::debug_span!(
428 "actor_instance",
429 actor_name = name,
430 actor_type = std::any::type_name::<A>(),
431 actor_id = actor_id.to_string(),
432 );
433 let _guard = span.enter();
434 let (instance, receivers) = Instance::new(self.clone(), actor_id.clone(), false, None);
435 let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
436 instance.change_status(ActorStatus::Client);
437
438 let introspect_cell = instance.inner.cell.clone();
439 let introspect_mailbox = instance.inner.mailbox.clone();
440 tokio::spawn(crate::introspect::serve_introspect(
441 introspect_cell,
442 introspect_mailbox,
443 receivers.introspect,
444 ));
445
446 let (signal_rx, supervision_rx) = receivers.actor_loop.unwrap();
447 Ok(ActorInstance {
448 instance,
449 handle,
450 supervision: supervision_rx,
451 signal: signal_rx,
452 work: receivers.work,
453 })
454 }
455
456 pub fn traverse<F>(&self, f: &mut F)
465 where
466 F: FnMut(&InstanceCell, usize),
467 {
468 for entry in self.state().instances.iter() {
469 if entry.key().pid() == 0 {
470 if let Some(cell) = entry.value().upgrade() {
471 cell.traverse(f);
472 }
473 }
474 }
475 }
476
477 pub fn get_instance(&self, actor_id: &reference::ActorId) -> Option<InstanceCell> {
479 self.state()
480 .instances
481 .get(actor_id)
482 .and_then(|weak| weak.upgrade())
483 }
484
485 pub fn root_actor_ids(&self) -> Vec<reference::ActorId> {
494 self.state()
495 .instances
496 .iter()
497 .filter(|entry| entry.key().pid() == 0)
498 .map(|entry| entry.key().clone())
499 .collect()
500 }
501
502 pub fn all_actor_ids(&self) -> Vec<reference::ActorId> {
511 self.state()
512 .instances
513 .iter()
514 .filter(|entry| {
515 entry
516 .value()
517 .upgrade()
518 .is_some_and(|cell| !cell.status().borrow().is_terminal())
519 })
520 .map(|entry| entry.key().clone())
521 .collect()
522 }
523
524 pub fn all_instance_keys(&self) -> Vec<reference::ActorId> {
536 self.state()
537 .instances
538 .iter()
539 .map(|entry| entry.key().clone())
540 .collect()
541 }
542
543 pub fn terminated_snapshot(
545 &self,
546 actor_id: &reference::ActorId,
547 ) -> Option<crate::introspect::IntrospectResult> {
548 self.state()
549 .terminated_snapshots
550 .get(actor_id)
551 .map(|e| e.value().clone())
552 }
553
554 pub fn all_terminated_actor_ids(&self) -> Vec<reference::ActorId> {
556 self.state()
557 .terminated_snapshots
558 .iter()
559 .map(|e| e.key().clone())
560 .collect()
561 }
562
563 fn child_instance(
565 &self,
566 parent: InstanceCell,
567 ) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
568 let actor_id = self.allocate_child_id(parent.actor_id())?;
569 let _ = tracing::debug_span!(
570 "child_actor_instance",
571 parent_actor_id = %parent.actor_id(),
572 actor_type = std::any::type_name::<()>(),
573 actor_id = %actor_id,
574 );
575
576 let (instance, _receivers) = Instance::new(self.clone(), actor_id, false, Some(parent));
577 let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
580 instance.change_status(ActorStatus::Client);
581 Ok((instance, handle))
582 }
583
584 pub(crate) fn spawn_child<A: Actor>(
590 &self,
591 parent: InstanceCell,
592 actor: A,
593 ) -> Result<ActorHandle<A>, anyhow::Error> {
594 let actor_id = self.allocate_child_id(parent.actor_id())?;
595 self.spawn_inner(actor_id, actor, Some(parent))
596 }
597
598 pub(crate) fn spawn_named_child<A: Actor>(
602 &self,
603 parent: InstanceCell,
604 name: &str,
605 actor: A,
606 ) -> Result<ActorHandle<A>, anyhow::Error> {
607 let actor_id = self.allocate_named_child_id(parent.actor_id(), name)?;
608 self.spawn_inner(actor_id, actor, Some(parent))
609 }
610
611 pub fn abort_root_actor(
615 &self,
616 root: &reference::ActorId,
617 this_handle: Option<&JoinHandle<()>>,
618 ) -> Option<impl Future<Output = reference::ActorId>> {
619 self.state()
620 .instances
621 .get(root)
622 .into_iter()
623 .flat_map(|e| e.upgrade())
624 .map(|cell| {
625 let r1 = root.clone();
626 let r2 = root.clone();
627 let skip_abort = this_handle.is_some_and(|this_h| {
629 cell.inner
630 .actor_task_handle
631 .get()
632 .is_some_and(|other_h| std::ptr::eq(this_h, other_h))
633 });
634 async move {
638 tokio::task::spawn_blocking(move || {
639 if !skip_abort {
640 let h = cell.inner.actor_task_handle.wait();
641 tracing::debug!("{}: aborting {:?}", r1, h);
642 h.abort();
643 }
644 })
645 .await
646 .unwrap();
647 r2
648 }
649 })
650 .next()
651 }
652
653 pub fn stop_actor(
656 &self,
657 actor_id: &reference::ActorId,
658 reason: String,
659 ) -> Option<watch::Receiver<ActorStatus>> {
660 if let Some(entry) = self.state().instances.get(actor_id) {
661 match entry.value().upgrade() {
662 None => None, Some(cell) => {
664 tracing::info!("sending stop signal to {}", cell.actor_id());
665 if let Err(err) = cell.signal(Signal::DrainAndStop(reason)) {
666 tracing::error!(
667 "{}: failed to send stop signal to pid {}: {:?}",
668 self.proc_id(),
669 cell.pid(),
670 err
671 );
672 None
673 } else {
674 Some(cell.status().clone())
675 }
676 }
677 }
678 } else {
679 tracing::error!("no actor {} found in {}", actor_id, self.proc_id());
680 None
681 }
682 }
683
684 pub async fn destroy_and_wait<A: Actor>(
692 &mut self,
693 timeout: Duration,
694 cx: Option<&Context<'_, A>>,
695 reason: &str,
696 ) -> Result<(Vec<reference::ActorId>, Vec<reference::ActorId>), anyhow::Error> {
697 self.destroy_and_wait_except_current::<A>(timeout, cx, false, reason)
698 .await
699 }
700
701 #[hyperactor::instrument]
711 pub async fn destroy_and_wait_except_current<A: Actor>(
712 &mut self,
713 timeout: Duration,
714 cx: Option<&Context<'_, A>>,
715 except_current: bool,
716 reason: &str,
717 ) -> Result<(Vec<reference::ActorId>, Vec<reference::ActorId>), anyhow::Error> {
718 tracing::debug!("{}: proc stopping", self.proc_id());
719
720 let (this_handle, this_actor_id) = cx.map_or((None, None), |cx| {
721 (
722 Some(cx.actor_task_handle().expect("cannot call destroy_and_wait from inside an actor unless actor has finished starting")),
723 Some(cx.self_id())
724 )
725 });
726
727 let mut statuses = HashMap::new();
728 for actor_id in self
729 .state()
730 .instances
731 .iter()
732 .filter(|entry| entry.key().pid() == 0)
733 .map(|entry| entry.key().clone())
734 .collect::<Vec<_>>()
735 {
736 if let Some(status) = self.stop_actor(&actor_id, reason.to_string()) {
737 statuses.insert(actor_id, status);
738 }
739 }
740 tracing::debug!("{}: proc stopped", self.proc_id());
741
742 let waits: Vec<_> = statuses
743 .iter_mut()
744 .filter(|(actor_id, _)| Some(*actor_id) != this_actor_id)
745 .map(|(actor_id, root)| {
746 let actor_id = actor_id.clone();
747 async move {
748 tokio::time::timeout(
749 timeout,
750 root.wait_for(|state: &ActorStatus| state.is_terminal()),
751 )
752 .await
753 .ok()
754 .map(|_| actor_id)
755 }
756 })
757 .collect();
758
759 let results = futures::future::join_all(waits).await;
760 let stopped_actors: Vec<_> = results
761 .iter()
762 .filter_map(|actor_id| actor_id.as_ref())
763 .cloned()
764 .collect();
765 let aborted_actors: Vec<_> = statuses
766 .iter()
767 .filter(|(actor_id, _)| !stopped_actors.contains(actor_id))
768 .map(|(actor_id, _)| {
769 let f = self.abort_root_actor(actor_id, this_handle);
770 async move {
771 let _ = if let Some(f) = f { Some(f.await) } else { None };
772 actor_id.clone()
777 }
778 })
779 .collect();
780 let aborted_actors = futures::future::join_all(aborted_actors).await;
781
782 if let Some(this_handle) = this_handle
783 && let Some(this_actor_id) = this_actor_id
784 && !except_current
785 {
786 tracing::debug!("{}: aborting (delayed) {:?}", this_actor_id, this_handle);
787 this_handle.abort()
788 };
789
790 tracing::info!(
791 "destroy_and_wait: {} actors stopped, {} actors aborted",
792 stopped_actors.len(),
793 aborted_actors.len()
794 );
795 Ok((stopped_actors, aborted_actors))
796 }
797
798 pub fn resolve_actor_ref<R: Actor + Referable>(
820 &self,
821 actor_ref: &reference::ActorRef<R>,
822 ) -> Option<ActorHandle<R>> {
823 let cell = self.inner.instances.get(actor_ref.actor_id())?.upgrade()?;
824 if cell.status().borrow().is_terminal() {
828 return None;
829 }
830 cell.downcast_handle()
831 }
832
833 fn allocate_root_id(&self, name: &str) -> Result<reference::ActorId, anyhow::Error> {
835 let name = name.to_string();
836 match self.state().roots.entry(name.to_string()) {
837 Entry::Vacant(entry) => {
838 entry.insert(AtomicUsize::new(1));
839 }
840 Entry::Occupied(_) => {
841 anyhow::bail!("an actor with name '{}' has already been spawned", name)
842 }
843 }
844 Ok(reference::ActorId::new(
845 self.state().proc_id.clone(),
846 name.to_string(),
847 0,
848 ))
849 }
850
851 #[hyperactor::instrument(fields(actor_name=parent_id.name()))]
853 pub(crate) fn allocate_child_id(
854 &self,
855 parent_id: &reference::ActorId,
856 ) -> Result<reference::ActorId, anyhow::Error> {
857 assert_eq!(*parent_id.proc_id(), self.state().proc_id);
858 let pid = match self.state().roots.get(parent_id.name()) {
859 None => anyhow::bail!(
860 "no actor named {} in proc {}",
861 parent_id.name(),
862 self.state().proc_id
863 ),
864 Some(next_pid) => next_pid.fetch_add(1, Ordering::Relaxed),
865 };
866 Ok(parent_id.child_id(pid))
867 }
868
869 pub(crate) fn allocate_named_child_id(
874 &self,
875 parent_id: &reference::ActorId,
876 name: &str,
877 ) -> Result<reference::ActorId, anyhow::Error> {
878 let inherited = self.allocate_child_id(parent_id)?;
879 Ok(reference::ActorId::new(
880 inherited.proc_id().clone(),
881 name,
882 inherited.pid(),
883 ))
884 }
885
886 pub fn downgrade(&self) -> WeakProc {
888 WeakProc::new(self)
889 }
890}
891
892#[async_trait]
893impl MailboxSender for Proc {
894 fn post_unchecked(
895 &self,
896 envelope: MessageEnvelope,
897 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
898 ) {
899 if envelope.dest().actor_id().proc_id() == &self.state().proc_id {
900 self.state().proc_muxer.post(envelope, return_handle)
901 } else {
902 self.state().forwarder.post(envelope, return_handle)
903 }
904 }
905}
906
907#[derive(Clone, Debug)]
909pub struct WeakProc(Weak<ProcState>);
910
911impl WeakProc {
912 fn new(proc: &Proc) -> Self {
913 Self(Arc::downgrade(&proc.inner))
914 }
915
916 pub fn upgrade(&self) -> Option<Proc> {
918 self.0.upgrade().map(|inner| Proc { inner })
919 }
920}
921
922#[async_trait]
923impl MailboxSender for WeakProc {
924 fn post_unchecked(
925 &self,
926 envelope: MessageEnvelope,
927 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
928 ) {
929 match self.upgrade() {
930 Some(proc) => proc.post(envelope, return_handle),
931 None => envelope.undeliverable(
932 DeliveryError::BrokenLink("fail to upgrade WeakProc".to_string()),
933 return_handle,
934 ),
935 }
936 }
937}
938
939pub struct WorkCell<A: Actor + Send>(
942 Box<
943 dyn for<'a> FnOnce(
944 &'a mut A,
945 &'a Instance<A>,
946 )
947 -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
948 + Send
949 + Sync,
950 >,
951);
952
953impl<A: Actor + Send> WorkCell<A> {
954 fn new(
956 f: impl for<'a> FnOnce(
957 &'a mut A,
958 &'a Instance<A>,
959 )
960 -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
961 + Send
962 + Sync
963 + 'static,
964 ) -> Self {
965 Self(Box::new(f))
966 }
967
968 pub fn handle<'a>(
970 self,
971 actor: &'a mut A,
972 instance: &'a Instance<A>,
973 ) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'a>> {
974 (self.0)(actor, instance)
975 }
976}
977
978pub struct Context<'a, A: Actor> {
980 instance: &'a Instance<A>,
981 headers: Flattrs,
982}
983
984impl<'a, A: Actor> Context<'a, A> {
985 pub fn new(instance: &'a Instance<A>, headers: Flattrs) -> Self {
987 Self { instance, headers }
988 }
989
990 pub fn headers(&self) -> &Flattrs {
992 &self.headers
993 }
994}
995
996impl<A: Actor> Deref for Context<'_, A> {
997 type Target = Instance<A>;
998
999 fn deref(&self) -> &Self::Target {
1000 self.instance
1001 }
1002}
1003
1004pub struct Instance<A: Actor> {
1008 inner: Arc<InstanceState<A>>,
1009}
1010
1011impl<A: Actor> fmt::Debug for Instance<A> {
1012 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1013 f.debug_struct("Instance").field("inner", &"..").finish()
1014 }
1015}
1016
1017struct InstanceState<A: Actor> {
1018 proc: Proc,
1020
1021 cell: InstanceCell,
1023
1024 mailbox: Mailbox,
1026
1027 ports: Arc<Ports<A>>,
1028
1029 status_tx: watch::Sender<ActorStatus>,
1031
1032 id: Uuid,
1034
1035 sequencer: Sequencer,
1037
1038 instance_locals: ActorLocalStorage,
1040}
1041
1042impl<A: Actor> InstanceState<A> {
1043 fn self_id(&self) -> &reference::ActorId {
1044 self.mailbox.actor_id()
1045 }
1046}
1047
1048impl<A: Actor> Drop for InstanceState<A> {
1049 fn drop(&mut self) {
1050 self.status_tx.send_if_modified(|status| {
1051 if status.is_terminal() {
1052 false
1053 } else {
1054 tracing::info!(
1055 name = "ActorStatus",
1056 actor_id = %self.self_id(),
1057 actor_name = self.self_id().name(),
1058 status = "Stopped",
1059 prev_status = status.arm().unwrap_or("unknown"),
1060 "instance is dropped",
1061 );
1062 *status = ActorStatus::Stopped("instance is dropped".into());
1063 true
1064 }
1065 });
1066 }
1067}
1068
1069pub struct InstanceReceivers<A: Actor> {
1076 actor_loop: Option<(PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>)>,
1079 work: mpsc::UnboundedReceiver<WorkCell<A>>,
1081 introspect: PortReceiver<IntrospectMessage>,
1083}
1084
1085impl<A: Actor> Instance<A> {
1086 fn new(
1088 proc: Proc,
1089 actor_id: reference::ActorId,
1090 detached: bool,
1091 parent: Option<InstanceCell>,
1092 ) -> (Self, InstanceReceivers<A>) {
1093 let mailbox = Mailbox::new(actor_id.clone(), BoxedMailboxSender::new(proc.downgrade()));
1095 let (work_tx, work_rx) = ordered_channel(
1096 actor_id.to_string(),
1097 hyperactor_config::global::get(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER),
1098 );
1099 let ports: Arc<Ports<A>> = Arc::new(Ports::new(mailbox.clone(), work_tx));
1100 proc.state().proc_muxer.bind_mailbox(mailbox.clone());
1101 let (status_tx, status_rx) = watch::channel(ActorStatus::Created);
1102
1103 let actor_type = match TypeInfo::of::<A>() {
1104 Some(info) => ActorType::Named(info),
1105 None => ActorType::Anonymous(std::any::type_name::<A>()),
1106 };
1107 let actor_loop_ports = if detached {
1108 None
1109 } else {
1110 let (signal_port, signal_receiver) = ports.open_message_port().unwrap();
1111 let (supervision_port, supervision_receiver) = mailbox.open_port();
1112 Some((
1113 (signal_port, supervision_port),
1114 (signal_receiver, supervision_receiver),
1115 ))
1116 };
1117
1118 let (actor_loop, actor_loop_receivers) = actor_loop_ports.unzip();
1119
1120 let (introspect_port, introspect_receiver) =
1128 ports.open_message_port::<IntrospectMessage>().unwrap();
1129 introspect_port.bind_actor_port();
1130
1131 let cell = InstanceCell::new(
1132 actor_id,
1133 actor_type,
1134 proc.clone(),
1135 actor_loop,
1136 status_rx,
1137 parent,
1138 ports.clone(),
1139 );
1140 let instance_id = Uuid::now_v7();
1141 let inner = Arc::new(InstanceState {
1142 proc,
1143 cell,
1144 mailbox,
1145 ports,
1146 status_tx,
1147 sequencer: Sequencer::new(instance_id),
1148 id: instance_id,
1149 instance_locals: ActorLocalStorage::new(),
1150 });
1151 (
1152 Self { inner },
1153 InstanceReceivers {
1154 actor_loop: actor_loop_receivers,
1155 work: work_rx,
1156 introspect: introspect_receiver,
1157 },
1158 )
1159 }
1160
1161 #[track_caller]
1164 fn change_status(&self, new: ActorStatus) {
1165 let old = self.inner.status_tx.send_replace(new.clone());
1166 assert!(
1172 !old.is_terminal() && !new.is_terminal()
1173 || !old.is_terminal() && new.is_terminal()
1174 || old == new,
1175 "actor changing status illegally, only allow non-terminal -> non-terminal \
1176 and non-terminal -> terminal statuses. actor_id={}, prev_status={}, status={}",
1177 self.self_id(),
1178 old,
1179 new
1180 );
1181 if !((old.is_idle() && new.is_processing())
1186 || (old.is_processing() && new.is_idle())
1187 || old == new)
1188 {
1189 let new_status = new.arm().unwrap_or("unknown");
1190 let change_reason = match new {
1191 ActorStatus::Failed(reason) => reason.to_string(),
1192 _ => "".to_string(),
1193 };
1194 tracing::info!(
1195 name = "ActorStatus",
1196 actor_id = %self.self_id(),
1197 actor_name = self.self_id().name(),
1198 status = new_status,
1199 prev_status = old.arm().unwrap_or("unknown"),
1200 caller = %Location::caller(),
1201 change_reason,
1202 );
1203 let actor_id = hash_to_u64(self.self_id());
1204 notify_actor_status_changed(ActorStatusEvent {
1205 id: generate_actor_status_event_id(actor_id),
1206 timestamp: std::time::SystemTime::now(),
1207 actor_id,
1208 new_status: new_status.to_string(),
1209 reason: if change_reason.is_empty() {
1210 None
1211 } else {
1212 Some(change_reason)
1213 },
1214 });
1215 }
1216 }
1217
1218 fn is_terminal(&self) -> bool {
1219 self.inner.status_tx.borrow().is_terminal()
1220 }
1221
1222 fn is_stopping(&self) -> bool {
1223 self.inner.status_tx.borrow().is_stopping()
1224 }
1225
1226 pub fn self_id(&self) -> &reference::ActorId {
1228 self.inner.self_id()
1229 }
1230
1231 pub fn introspect_payload(&self) -> crate::introspect::IntrospectResult {
1247 crate::introspect::live_actor_payload(&self.inner.cell)
1248 }
1249
1250 pub fn publish_attrs(&self, attrs: hyperactor_config::Attrs) {
1258 #[cfg(debug_assertions)]
1259 {
1260 use std::collections::HashSet;
1261 use std::sync::OnceLock;
1262
1263 use hyperactor_config::attrs::AttrKeyInfo;
1264
1265 static INTROSPECT_KEYS: OnceLock<HashSet<&'static str>> = OnceLock::new();
1266 let allowed = INTROSPECT_KEYS.get_or_init(|| {
1267 inventory::iter::<AttrKeyInfo>()
1268 .filter(|info| info.meta.get(hyperactor_config::INTROSPECT).is_some())
1269 .map(|info| info.name)
1270 .collect()
1271 });
1272 for (name, _) in attrs.iter() {
1273 debug_assert!(
1274 allowed.contains(name),
1275 "publish_attrs: key {:?} is not tagged with INTROSPECT",
1276 name
1277 );
1278 }
1279 }
1280 self.inner.cell.set_published_attrs(attrs);
1281 }
1282
1283 pub fn publish_attr<T: hyperactor_config::AttrValue>(
1289 &self,
1290 key: hyperactor_config::Key<T>,
1291 value: T,
1292 ) {
1293 debug_assert!(
1294 key.attrs().get(hyperactor_config::INTROSPECT).is_some(),
1295 "publish_attr called with non-introspection key: {}",
1296 key.name()
1297 );
1298 self.inner.cell.merge_published_attr(key, value);
1299 }
1300
1301 pub fn set_system(&self) {
1304 self.inner
1305 .cell
1306 .inner
1307 .is_system
1308 .store(true, Ordering::Relaxed);
1309 }
1310
1311 pub fn set_query_child_handler(
1320 &self,
1321 handler: impl (Fn(&crate::reference::Reference) -> IntrospectResult) + Send + Sync + 'static,
1322 ) {
1323 self.inner.cell.set_query_child_handler(handler);
1324 }
1325
1326 pub fn stop(&self, reason: &str) -> Result<(), ActorError> {
1328 tracing::info!(
1329 actor_id = %self.inner.cell.actor_id(),
1330 reason,
1331 "Instance::stop called",
1332 );
1333 self.inner
1334 .cell
1335 .signal(Signal::DrainAndStop(reason.to_string()))
1336 }
1337
1338 pub fn abort(&self, reason: &str) -> Result<(), ActorError> {
1340 tracing::info!(
1341 actor_id = %self.inner.cell.actor_id(),
1342 reason,
1343 "Instance::abort called",
1344 );
1345 self.inner.cell.signal(Signal::Abort(reason.to_string()))
1346 }
1347
1348 pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1353 self.inner.mailbox.open_port()
1354 }
1355
1356 pub fn open_once_port<M: Message>(&self) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1360 self.inner.mailbox.open_once_port()
1361 }
1362
1363 pub fn locals(&self) -> &ActorLocalStorage {
1365 &self.inner.instance_locals
1366 }
1367
1368 pub fn post(&self, port_id: reference::PortId, headers: Flattrs, message: wirevalue::Any) {
1370 <Self as context::MailboxExt>::post(
1371 self,
1372 port_id,
1373 headers,
1374 message,
1375 true,
1376 context::SeqInfoPolicy::AssignNew,
1377 )
1378 }
1379
1380 #[doc(hidden)]
1386 pub fn post_with_external_seq_info(
1387 &self,
1388 port_id: reference::PortId,
1389 headers: Flattrs,
1390 message: wirevalue::Any,
1391 ) {
1392 <Self as context::MailboxExt>::post(
1393 self,
1394 port_id,
1395 headers,
1396 message,
1397 true,
1398 context::SeqInfoPolicy::AllowExternal,
1399 )
1400 }
1401
1402 pub fn self_message_with_delay<M>(&self, message: M, delay: Duration) -> Result<(), ActorError>
1404 where
1405 M: Message,
1406 A: Handler<M>,
1407 {
1408 static CLIENT: OnceLock<(Instance<()>, ActorHandle<()>)> = OnceLock::new();
1410 let client = &CLIENT
1411 .get_or_init(|| Proc::runtime().instance("self_message_client").unwrap())
1412 .0;
1413 let port = self.port();
1414 let self_id = self.self_id().clone();
1415 tokio::spawn(async move {
1416 tokio::time::sleep(delay).await;
1417 if let Err(e) = port.send(&client, message) {
1418 tracing::info!("{}: error sending delayed message: {}", self_id, e);
1421 }
1422 });
1423 Ok(())
1424 }
1425
1426 fn start(self, actor: A, receivers: InstanceReceivers<A>) -> ActorHandle<A> {
1429 let instance_cell = self.inner.cell.clone();
1430 let actor_id = self.inner.cell.actor_id().clone();
1431 let actor_handle = ActorHandle::new(self.inner.cell.clone(), self.inner.ports.clone());
1432
1433 let introspect_cell = self.inner.cell.clone();
1437 let introspect_mailbox = self.inner.mailbox.clone();
1438 tokio::spawn(crate::introspect::serve_introspect(
1439 introspect_cell,
1440 introspect_mailbox,
1441 receivers.introspect,
1442 ));
1443
1444 let actor_loop_receivers = receivers
1445 .actor_loop
1446 .expect("non-detached instance must have actor loop receivers");
1447 let actor_task_handle = A::spawn_server_task(
1448 panic_handler::with_backtrace_tracking(self.serve(
1449 actor,
1450 actor_loop_receivers,
1451 receivers.work,
1452 ))
1453 .instrument(Span::current()),
1454 );
1455 tracing::debug!("{}: spawned with {:?}", actor_id, actor_task_handle);
1456 instance_cell
1457 .inner
1458 .actor_task_handle
1459 .set(actor_task_handle)
1460 .unwrap_or_else(|_| panic!("{}: task handle store failed", actor_id));
1461
1462 actor_handle
1463 }
1464
1465 async fn serve(
1466 mut self,
1467 mut actor: A,
1468 actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1469 mut work_rx: mpsc::UnboundedReceiver<WorkCell<A>>,
1470 ) {
1471 let result = self
1472 .run_actor_tree(&mut actor, actor_loop_receivers, &mut work_rx)
1473 .await;
1474
1475 assert!(self.is_stopping());
1476 let event = match result {
1477 Ok(stop_reason) => {
1478 let status = ActorStatus::Stopped(stop_reason);
1479 self.mailbox().close(status.clone());
1480 self.change_status(status);
1482 None
1483 }
1484 Err(err) => {
1485 match *err.kind {
1486 ActorErrorKind::UnhandledSupervisionEvent(box event) => {
1487 assert!(event.actor_status.is_terminal());
1492 self.mailbox().close(event.actor_status.clone());
1493 *self.inner.cell.inner.supervision_event.lock().unwrap() =
1495 Some(event.clone());
1496 self.change_status(event.actor_status.clone());
1497 Some(event)
1498 }
1499 _ => {
1500 let error_kind = ActorErrorKind::Generic(err.kind.to_string());
1501 let status = ActorStatus::Failed(error_kind);
1502 self.mailbox().close(status.clone());
1503 let event = ActorSupervisionEvent::new(
1504 self.inner.cell.actor_id().clone(),
1505 actor.display_name(),
1506 status.clone(),
1507 None,
1508 );
1509 *self.inner.cell.inner.supervision_event.lock().unwrap() =
1511 Some(event.clone());
1512 self.change_status(status);
1513 Some(event)
1514 }
1515 }
1516 }
1517 };
1518
1519 if let Some(parent) = self.inner.cell.maybe_unlink_parent() {
1520 if let Some(event) = event {
1521 parent.send_supervision_event_or_crash(&self, event);
1523 }
1524 if let Err(err) = parent.signal(Signal::ChildStopped(self.inner.cell.pid())) {
1527 tracing::error!(
1528 "{}: failed to send stop message to parent pid {}: {:?}",
1529 self.self_id(),
1530 parent.pid(),
1531 err
1532 );
1533 }
1534 } else {
1535 if let Some(event) = event {
1541 self.inner
1542 .proc
1543 .handle_unhandled_supervision_event(&self, event);
1544 }
1545 }
1546 }
1547
1548 async fn run_actor_tree(
1552 &mut self,
1553 actor: &mut A,
1554 mut actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1555 work_rx: &mut mpsc::UnboundedReceiver<WorkCell<A>>,
1556 ) -> Result<String, ActorError> {
1557 let mut did_panic = false;
1563 let result = match AssertUnwindSafe(self.run(actor, &mut actor_loop_receivers, work_rx))
1564 .catch_unwind()
1565 .await
1566 {
1567 Ok(result) => result,
1568 Err(_) => {
1569 did_panic = true;
1570 let panic_info = panic_handler::take_panic_info()
1571 .map(|info| info.to_string())
1572 .unwrap_or_else(|e| format!("Cannot take backtrace due to: {:?}", e));
1573 Err(ActorError::new(
1574 self.self_id(),
1575 ActorErrorKind::panic(anyhow::anyhow!(panic_info)),
1576 ))
1577 }
1578 };
1579
1580 assert!(!self.is_terminal());
1581 self.change_status(ActorStatus::Stopping);
1582 if let Err(err) = &result {
1583 tracing::error!("{}: actor failure: {}", self.self_id(), err);
1584 }
1585
1586 let mut to_unlink = Vec::new();
1589 for child in self.inner.cell.child_iter() {
1590 if let Err(err) = child
1591 .value()
1592 .signal(Signal::Stop("parent stopping".to_string()))
1593 {
1594 tracing::error!(
1595 "{}: failed to send stop signal to child pid {}: {:?}",
1596 self.self_id(),
1597 child.key(),
1598 err
1599 );
1600 to_unlink.push(child.value().clone());
1601 }
1602 }
1603 for child in to_unlink {
1605 self.inner.cell.unlink(&child);
1606 }
1607
1608 let (mut signal_receiver, _) = actor_loop_receivers;
1609 while self.inner.cell.child_count() > 0 {
1610 match tokio::time::timeout(Duration::from_millis(500), signal_receiver.recv()).await {
1611 Ok(signal) => {
1612 if let Signal::ChildStopped(pid) = signal? {
1613 assert!(self.inner.cell.get_child(pid).is_none());
1614 }
1615 }
1616 Err(_) => {
1617 tracing::warn!(
1618 "timeout waiting for ChildStopped signal from child on actor: {}, ignoring",
1619 self.self_id()
1620 );
1621 self.inner.cell.unlink_all();
1624 break;
1625 }
1626 }
1627 }
1628 let cleanup_result = if !did_panic {
1634 let cleanup_timeout = hyperactor_config::global::get(config::CLEANUP_TIMEOUT);
1635 match tokio::time::timeout(cleanup_timeout, actor.cleanup(self, result.as_ref().err()))
1636 .await
1637 {
1638 Ok(Ok(x)) => Ok(x),
1639 Ok(Err(e)) => Err(ActorError::new(self.self_id(), ActorErrorKind::cleanup(e))),
1640 Err(e) => Err(ActorError::new(
1641 self.self_id(),
1642 ActorErrorKind::cleanup(e.into()),
1643 )),
1644 }
1645 } else {
1646 Ok(())
1647 };
1648 if let Err(ref actor_err) = result {
1649 if let Err(ref err) = cleanup_result {
1652 tracing::warn!(
1653 cleanup_err = %err,
1654 %actor_err,
1655 "ignoring cleanup error after actor error",
1656 );
1657 }
1658 }
1659 result.and_then(|reason| cleanup_result.map(|_| reason))
1662 }
1663
1664 async fn run(
1668 &mut self,
1669 actor: &mut A,
1670 actor_loop_receivers: &mut (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1671 work_rx: &mut mpsc::UnboundedReceiver<WorkCell<A>>,
1672 ) -> Result<String, ActorError> {
1673 let (signal_receiver, supervision_event_receiver) = actor_loop_receivers;
1674
1675 self.change_status(ActorStatus::Initializing);
1676 actor
1677 .init(self)
1678 .await
1679 .map_err(|err| ActorError::new(self.self_id(), ActorErrorKind::init(err)))?;
1680 let need_drain;
1681 let stop_reason;
1682 'messages: loop {
1683 self.change_status(ActorStatus::Idle);
1684 let metric_pairs =
1685 hyperactor_telemetry::kv_pairs!("actor_id" => self.self_id().to_string());
1686 tokio::select! {
1687 work = work_rx.recv() => {
1688 ACTOR_MESSAGES_RECEIVED.add(1, metric_pairs);
1689 ACTOR_MESSAGE_QUEUE_SIZE.add(-1, metric_pairs);
1690 let _ = ACTOR_MESSAGE_HANDLER_DURATION.start(metric_pairs);
1691 let work = work.expect("inconsistent work queue state");
1692 if let Err(err) = work.handle(actor, self).await {
1693 for supervision_event in supervision_event_receiver.drain() {
1694 self.handle_supervision_event(actor, supervision_event).await?;
1695 }
1696 let kind = ActorErrorKind::processing(err);
1697 return Err(ActorError {
1698 actor_id: Box::new(self.self_id().clone()),
1699 kind: Box::new(kind),
1700 });
1701 }
1702 }
1703 signal = signal_receiver.recv() => {
1704 let signal = signal.map_err(ActorError::from);
1705 tracing::debug!("Received signal {signal:?}");
1706 match signal? {
1707 Signal::Stop(reason) => {
1708 need_drain = false;
1709 stop_reason = reason;
1710 break 'messages;
1711 },
1712 Signal::DrainAndStop(reason) => {
1713 need_drain = true;
1714 stop_reason = reason;
1715 break 'messages;
1716 },
1717 Signal::ChildStopped(pid) => {
1718 assert!(self.inner.cell.get_child(pid).is_none());
1719 },
1720 Signal::Abort(reason) => {
1721 return Err(ActorError { actor_id: Box::new(self.self_id().clone()), kind: Box::new(ActorErrorKind::Aborted(reason)) });
1722 }
1723 }
1724 }
1725 Ok(supervision_event) = supervision_event_receiver.recv() => {
1726 self.handle_supervision_event(actor, supervision_event).await?;
1727 }
1728 }
1729 self.inner
1730 .cell
1731 .inner
1732 .num_processed_messages
1733 .fetch_add(1, Ordering::SeqCst);
1734 }
1735
1736 if need_drain {
1737 let mut n = 0;
1738 while let Ok(work) = work_rx.try_recv() {
1739 if let Err(err) = work.handle(actor, self).await {
1740 return Err(ActorError::new(
1741 self.self_id(),
1742 ActorErrorKind::processing(err),
1743 ));
1744 }
1745 n += 1;
1746 }
1747 tracing::debug!("drained {} messages", n);
1748 }
1749 tracing::debug!(
1750 actor_id = %self.self_id(),
1751 reason = stop_reason,
1752 "exited actor loop",
1753 );
1754 Ok(stop_reason)
1755 }
1756
1757 pub async fn handle_supervision_event(
1759 &self,
1760 actor: &mut A,
1761 supervision_event: ActorSupervisionEvent,
1762 ) -> Result<(), ActorError> {
1763 match actor
1765 .handle_supervision_event(self, &supervision_event)
1766 .await
1767 {
1768 Ok(true) => {
1769 Ok(())
1771 }
1772 Ok(false) => {
1773 let kind = ActorErrorKind::UnhandledSupervisionEvent(Box::new(supervision_event));
1774 Err(ActorError::new(self.self_id(), kind))
1775 }
1776 Err(err) => {
1777 let kind = ActorErrorKind::ErrorDuringHandlingSupervision(
1780 err.to_string(),
1781 Box::new(supervision_event),
1782 );
1783 Err(ActorError::new(self.self_id(), kind))
1784 }
1785 }
1786 }
1787
1788 async unsafe fn handle_message<M: Message>(
1789 &self,
1790 actor: &mut A,
1791 type_info: Option<&'static TypeInfo>,
1792 headers: Flattrs,
1793 message: M,
1794 ) -> Result<(), anyhow::Error>
1795 where
1796 A: Handler<M>,
1797 {
1798 let handler_info = match type_info {
1800 Some(info) => {
1801 let arm = unsafe { info.arm_unchecked(&message as *const M as *const ()) };
1803 HandlerInfo::from_static(info.typename(), arm)
1804 }
1805 None => {
1806 HandlerInfo::from_static(std::any::type_name::<M>(), None)
1808 }
1809 };
1810 self.handle_message_with_handler_info(actor, handler_info, headers, message)
1812 .await
1813 }
1814
1815 #[tracing::instrument(level = "debug", name = "handle_message", skip_all, fields(actor_id = %self.self_id(), message_type = %handler_info))]
1817 async fn handle_message_with_handler_info<M: Message>(
1818 &self,
1819 actor: &mut A,
1820 handler_info: HandlerInfo,
1821 headers: Flattrs,
1822 message: M,
1823 ) -> Result<(), anyhow::Error>
1824 where
1825 A: Handler<M>,
1826 {
1827 let now = std::time::SystemTime::now();
1828 let handler_info = Some(handler_info);
1829 self.change_status(ActorStatus::Processing(now, handler_info.clone()));
1830 crate::mailbox::headers::log_message_latency_if_sampling(
1831 &headers,
1832 self.self_id().to_string(),
1833 );
1834
1835 let message_id = headers.get(crate::mailbox::headers::TELEMETRY_MESSAGE_ID);
1836
1837 if let Some(message_id) = message_id {
1838 let from_actor_id = headers
1839 .get(crate::mailbox::headers::SENDER_ACTOR_ID_HASH)
1840 .unwrap_or(0);
1841 let to_actor_id = hash_to_u64(self.self_id());
1842 let port_id = headers.get(crate::mailbox::headers::TELEMETRY_PORT_ID);
1843
1844 notify_message(hyperactor_telemetry::MessageEvent {
1845 timestamp: now,
1846 id: message_id,
1847 from_actor_id,
1848 to_actor_id,
1849 endpoint: None,
1851 port_id,
1852 });
1853
1854 notify_message_status(hyperactor_telemetry::MessageStatusEvent {
1855 timestamp: now,
1856 id: hyperactor_telemetry::generate_status_event_id(message_id),
1857 message_id,
1858 status: "active".to_string(),
1859 });
1860 }
1861
1862 *self.inner.cell.inner.last_message_handler.write().unwrap() = handler_info;
1864
1865 let context = Context::new(self, headers);
1866 let start = Instant::now();
1870 let result = actor
1871 .handle(&context, message)
1872 .instrument(self.inner.cell.inner.recording.span())
1873 .await;
1874 let elapsed_us = start.elapsed().as_micros() as u64;
1875 self.inner
1876 .cell
1877 .inner
1878 .total_processing_time_us
1879 .fetch_add(elapsed_us, Ordering::SeqCst);
1880
1881 if let Some(message_id) = message_id {
1882 notify_message_status(hyperactor_telemetry::MessageStatusEvent {
1883 timestamp: std::time::SystemTime::now(),
1884 id: hyperactor_telemetry::generate_status_event_id(message_id),
1885 message_id,
1886 status: "complete".to_string(),
1887 });
1888 }
1889
1890 result
1891 }
1892
1893 pub fn spawn<C: Actor>(&self, actor: C) -> anyhow::Result<ActorHandle<C>> {
1895 self.inner.proc.spawn_child(self.inner.cell.clone(), actor)
1896 }
1897
1898 pub fn spawn_with_name<C: Actor>(
1902 &self,
1903 name: &str,
1904 actor: C,
1905 ) -> anyhow::Result<ActorHandle<C>> {
1906 self.inner
1907 .proc
1908 .spawn_named_child(self.inner.cell.clone(), name, actor)
1909 }
1910
1911 pub fn child(&self) -> anyhow::Result<(Instance<()>, ActorHandle<()>)> {
1913 self.inner.proc.child_instance(self.inner.cell.clone())
1914 }
1915
1916 pub fn port<M: Message>(&self) -> PortHandle<M>
1919 where
1920 A: Handler<M>,
1921 {
1922 self.inner.ports.get()
1923 }
1924
1925 pub fn handle(&self) -> ActorHandle<A> {
1927 ActorHandle::new(self.inner.cell.clone(), Arc::clone(&self.inner.ports))
1928 }
1929
1930 pub fn bind<R: Binds<A>>(&self) -> reference::ActorRef<R> {
1932 self.inner.cell.bind(self.inner.ports.as_ref())
1933 }
1934
1935 #[doc(hidden)]
1937 pub fn mailbox_for_py(&self) -> &Mailbox {
1938 &self.inner.mailbox
1939 }
1940
1941 pub fn proc(&self) -> &Proc {
1943 &self.inner.proc
1944 }
1945
1946 #[doc(hidden)]
1950 pub fn clone_for_py(&self) -> Self {
1951 Self {
1952 inner: Arc::clone(&self.inner),
1953 }
1954 }
1955
1956 fn actor_task_handle(&self) -> Option<&JoinHandle<()>> {
1958 self.inner.cell.inner.actor_task_handle.get()
1959 }
1960
1961 pub fn sequencer(&self) -> &Sequencer {
1963 &self.inner.sequencer
1964 }
1965
1966 pub fn instance_id(&self) -> Uuid {
1968 self.inner.id
1969 }
1970
1971 pub fn parent_handle<P: Actor>(&self) -> Option<ActorHandle<P>> {
1973 let parent_cell = self.inner.cell.inner.parent.upgrade()?;
1974 let ports = if let Ok(ports) = parent_cell.inner.ports.clone().downcast() {
1975 ports
1976 } else {
1977 return None;
1978 };
1979 Some(ActorHandle::new(parent_cell, ports))
1980 }
1981}
1982
1983impl<A: Actor> context::Mailbox for Instance<A> {
1984 fn mailbox(&self) -> &Mailbox {
1985 &self.inner.mailbox
1986 }
1987}
1988
1989impl<A: Actor> context::Mailbox for Context<'_, A> {
1990 fn mailbox(&self) -> &Mailbox {
1991 &self.instance.inner.mailbox
1992 }
1993}
1994
1995impl<A: Actor> context::Mailbox for &Instance<A> {
1996 fn mailbox(&self) -> &Mailbox {
1997 &self.inner.mailbox
1998 }
1999}
2000
2001impl<A: Actor> context::Mailbox for &Context<'_, A> {
2002 fn mailbox(&self) -> &Mailbox {
2003 &self.instance.inner.mailbox
2004 }
2005}
2006
2007impl<A: Actor> context::Actor for Instance<A> {
2008 type A = A;
2009 fn instance(&self) -> &Instance<A> {
2010 self
2011 }
2012}
2013
2014impl<A: Actor> context::Actor for Context<'_, A> {
2015 type A = A;
2016 fn instance(&self) -> &Instance<A> {
2017 self
2018 }
2019}
2020
2021impl<A: Actor> context::Actor for &Instance<A> {
2022 type A = A;
2023 fn instance(&self) -> &Instance<A> {
2024 self
2025 }
2026}
2027
2028impl<A: Actor> context::Actor for &Context<'_, A> {
2029 type A = A;
2030 fn instance(&self) -> &Instance<A> {
2031 self
2032 }
2033}
2034
2035impl Instance<()> {
2036 pub fn bind_actor_port<M: RemoteMessage>(&self) -> (PortHandle<M>, PortReceiver<M>) {
2038 assert!(
2039 self.actor_task_handle().is_none(),
2040 "can only bind actor port on instance with no running actor task"
2041 );
2042 self.inner.mailbox.bind_actor_port()
2043 }
2044}
2045
2046#[derive(Debug)]
2047enum ActorType {
2048 Named(&'static TypeInfo),
2049 Anonymous(&'static str),
2050}
2051
2052impl ActorType {
2053 fn type_name(&self) -> &str {
2054 match self {
2055 ActorType::Named(info) => info.typename(),
2056 ActorType::Anonymous(name) => name,
2057 }
2058 }
2059}
2060
2061#[derive(Clone)]
2067pub struct InstanceCell {
2068 inner: Arc<InstanceCellState>,
2069}
2070
2071impl fmt::Debug for InstanceCell {
2072 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2073 f.debug_struct("InstanceCell")
2074 .field("actor_id", &self.inner.actor_id)
2075 .field("actor_type", &self.inner.actor_type)
2076 .finish()
2077 }
2078}
2079
2080struct InstanceCellState {
2081 actor_id: reference::ActorId,
2083
2084 actor_type: ActorType,
2086
2087 proc: Proc,
2089
2090 actor_loop: Option<(PortHandle<Signal>, PortHandle<ActorSupervisionEvent>)>,
2092
2093 status: watch::Receiver<ActorStatus>,
2095
2096 parent: WeakInstanceCell,
2098
2099 children: DashMap<reference::Index, InstanceCell>,
2101
2102 actor_task_handle: OnceLock<JoinHandle<()>>,
2104
2105 exported_named_ports: DashMap<u64, &'static str>,
2107
2108 num_processed_messages: AtomicU64,
2110
2111 created_at: SystemTime,
2113
2114 last_message_handler: RwLock<Option<HandlerInfo>>,
2116
2117 total_processing_time_us: AtomicU64,
2119
2120 recording: Recording,
2123
2124 published_attrs: RwLock<Option<hyperactor_config::Attrs>>,
2133
2134 query_child_handler: RwLock<
2142 Option<Box<dyn (Fn(&crate::reference::Reference) -> IntrospectResult) + Send + Sync>>,
2143 >,
2144
2145 supervision_event: std::sync::Mutex<Option<crate::supervision::ActorSupervisionEvent>>,
2148
2149 is_system: AtomicBool,
2153
2154 ports: Arc<dyn Any + Send + Sync>,
2157}
2158
2159impl InstanceCellState {
2160 fn maybe_unlink_parent(&self) -> Option<InstanceCell> {
2163 self.parent
2164 .upgrade()
2165 .filter(|parent| parent.inner.unlink(self))
2166 }
2167
2168 fn unlink(&self, child: &InstanceCellState) -> bool {
2170 assert_eq!(self.actor_id.proc_id(), child.actor_id.proc_id());
2171 self.children.remove(&child.actor_id.pid()).is_some()
2172 }
2173}
2174
2175fn select_eviction_candidates(
2188 entries: &[(reference::ActorId, Option<String>)],
2189 excess: usize,
2190) -> Vec<reference::ActorId> {
2191 let mut clean: Vec<&reference::ActorId> = Vec::new();
2192 let mut failed: Vec<(&reference::ActorId, &str)> = Vec::new();
2193 for (id, occurred_at) in entries {
2194 match occurred_at {
2195 Some(ts) => failed.push((id, ts.as_str())),
2196 None => clean.push(id),
2197 }
2198 }
2199
2200 let mut to_remove: Vec<reference::ActorId> = Vec::new();
2201 let mut remaining = excess;
2202
2203 for id in clean {
2205 if remaining == 0 {
2206 break;
2207 }
2208 to_remove.push(id.clone());
2209 remaining -= 1;
2210 }
2211
2212 if remaining > 0 {
2214 failed.sort_by(|a, b| b.1.cmp(a.1));
2215 for (id, _) in failed.into_iter().take(remaining) {
2216 to_remove.push(id.clone());
2217 }
2218 }
2219
2220 to_remove
2221}
2222
2223impl InstanceCell {
2224 fn new(
2227 actor_id: reference::ActorId,
2228 actor_type: ActorType,
2229 proc: Proc,
2230 actor_loop: Option<(PortHandle<Signal>, PortHandle<ActorSupervisionEvent>)>,
2231 status: watch::Receiver<ActorStatus>,
2232 parent: Option<InstanceCell>,
2233 ports: Arc<dyn Any + Send + Sync>,
2234 ) -> Self {
2235 let _ais = actor_id.to_string();
2236 let cell = Self {
2237 inner: Arc::new(InstanceCellState {
2238 actor_id: actor_id.clone(),
2239 actor_type,
2240 proc: proc.clone(),
2241 actor_loop,
2242 status,
2243 parent: parent.map_or_else(WeakInstanceCell::new, |cell| cell.downgrade()),
2244 children: DashMap::new(),
2245 actor_task_handle: OnceLock::new(),
2246 exported_named_ports: DashMap::new(),
2247 num_processed_messages: AtomicU64::new(0),
2248 created_at: std::time::SystemTime::now(),
2249 last_message_handler: RwLock::new(None),
2250 total_processing_time_us: AtomicU64::new(0),
2251 recording: hyperactor_telemetry::recorder().record(64),
2252 published_attrs: RwLock::new(None),
2253 query_child_handler: RwLock::new(None),
2254 supervision_event: std::sync::Mutex::new(None),
2255 is_system: AtomicBool::new(false),
2256 ports,
2257 }),
2258 };
2259 cell.maybe_link_parent();
2260 proc.inner
2261 .instances
2262 .insert(actor_id.clone(), cell.downgrade());
2263 cell
2264 }
2265
2266 fn wrap(inner: Arc<InstanceCellState>) -> Self {
2267 Self { inner }
2268 }
2269
2270 pub fn actor_id(&self) -> &reference::ActorId {
2272 &self.inner.actor_id
2273 }
2274
2275 pub(crate) fn pid(&self) -> reference::Index {
2277 self.inner.actor_id.pid()
2278 }
2279
2280 #[allow(dead_code)]
2282 pub(crate) fn actor_task_handle(&self) -> Option<&JoinHandle<()>> {
2283 self.inner.actor_task_handle.get()
2284 }
2285
2286 pub fn status(&self) -> &watch::Receiver<ActorStatus> {
2288 &self.inner.status
2289 }
2290
2291 pub fn supervision_event(&self) -> Option<crate::supervision::ActorSupervisionEvent> {
2294 self.inner.supervision_event.lock().unwrap().clone()
2295 }
2296
2297 pub fn signal(&self, signal: Signal) -> Result<(), ActorError> {
2299 if let Some((signal_port, _)) = &self.inner.actor_loop {
2300 static CLIENT: OnceLock<(Instance<()>, ActorHandle<()>)> = OnceLock::new();
2302 let client = &CLIENT
2303 .get_or_init(|| Proc::runtime().instance("global_signal_client").unwrap())
2304 .0;
2305 signal_port.send(&client, signal).map_err(ActorError::from)
2306 } else {
2307 tracing::warn!(
2308 "{}: attempted to send signal {} to detached actor",
2309 self.inner.actor_id,
2310 signal
2311 );
2312 Ok(())
2313 }
2314 }
2315
2316 pub fn send_supervision_event_or_crash(
2325 &self,
2326 child_cx: &impl context::Actor, event: ActorSupervisionEvent,
2328 ) {
2329 match &self.inner.actor_loop {
2330 Some((_, supervision_port)) => {
2331 if let Err(err) = supervision_port.send(child_cx, event) {
2332 tracing::error!(
2333 "{}: failed to send supervision event to actor: {:?}. Crash the process.",
2334 self.actor_id(),
2335 err
2336 );
2337 std::process::exit(1);
2338 }
2339 }
2340 None => {
2341 tracing::error!(
2342 "{}: failed: {}: cannot send supervision event to detached actor: crashing",
2343 self.actor_id(),
2344 event,
2345 );
2346 std::process::exit(1);
2347 }
2348 }
2349 }
2350
2351 pub fn downgrade(&self) -> WeakInstanceCell {
2353 WeakInstanceCell {
2354 inner: Arc::downgrade(&self.inner),
2355 }
2356 }
2357
2358 fn link(&self, child: InstanceCell) {
2360 assert_eq!(self.actor_id().proc_id(), child.actor_id().proc_id());
2361 self.inner.children.insert(child.pid(), child);
2362 }
2363
2364 fn unlink(&self, child: &InstanceCell) {
2366 assert_eq!(self.actor_id().proc_id(), child.actor_id().proc_id());
2367 self.inner.children.remove(&child.pid());
2368 }
2369
2370 fn unlink_all(&self) {
2372 self.inner.children.clear();
2373 }
2374
2375 fn maybe_link_parent(&self) {
2377 if let Some(parent) = self.inner.parent.upgrade() {
2378 parent.link(self.clone());
2379 }
2380 }
2381
2382 fn maybe_unlink_parent(&self) -> Option<InstanceCell> {
2385 self.inner.maybe_unlink_parent()
2386 }
2387
2388 fn child_iter(&self) -> impl Iterator<Item = RefMulti<'_, reference::Index, InstanceCell>> {
2391 self.inner.children.iter()
2392 }
2393
2394 pub fn child_count(&self) -> usize {
2396 self.inner.children.len()
2397 }
2398
2399 pub fn child_actor_ids(&self) -> Vec<reference::ActorId> {
2401 self.inner
2402 .children
2403 .iter()
2404 .map(|entry| entry.value().actor_id().clone())
2405 .collect()
2406 }
2407
2408 fn get_child(&self, pid: reference::Index) -> Option<InstanceCell> {
2410 self.inner.children.get(&pid).map(|child| child.clone())
2411 }
2412
2413 pub fn recording(&self) -> &Recording {
2415 &self.inner.recording
2416 }
2417
2418 pub fn created_at(&self) -> SystemTime {
2420 self.inner.created_at
2421 }
2422
2423 pub fn num_processed_messages(&self) -> u64 {
2425 self.inner.num_processed_messages.load(Ordering::SeqCst)
2426 }
2427
2428 pub fn last_message_handler(&self) -> Option<HandlerInfo> {
2430 self.inner.last_message_handler.read().unwrap().clone()
2431 }
2432
2433 pub fn total_processing_time_us(&self) -> u64 {
2435 self.inner.total_processing_time_us.load(Ordering::SeqCst)
2436 }
2437
2438 pub fn parent(&self) -> Option<InstanceCell> {
2440 self.inner.parent.upgrade()
2441 }
2442
2443 pub fn actor_type_name(&self) -> &str {
2445 self.inner.actor_type.type_name()
2446 }
2447
2448 pub fn set_published_attrs(&self, attrs: hyperactor_config::Attrs) {
2450 *self.inner.published_attrs.write().unwrap() = Some(attrs);
2451 }
2452
2453 pub fn merge_published_attr<T: hyperactor_config::AttrValue>(
2456 &self,
2457 key: hyperactor_config::Key<T>,
2458 value: T,
2459 ) {
2460 self.inner
2461 .published_attrs
2462 .write()
2463 .unwrap()
2464 .get_or_insert_with(hyperactor_config::Attrs::new)
2465 .set(key, value);
2466 }
2467
2468 pub fn published_attrs(&self) -> Option<hyperactor_config::Attrs> {
2470 self.inner.published_attrs.read().unwrap().clone()
2471 }
2472
2473 pub fn set_query_child_handler(
2481 &self,
2482 handler: impl (Fn(&crate::reference::Reference) -> IntrospectResult) + Send + Sync + 'static,
2483 ) {
2484 *self.inner.query_child_handler.write().unwrap() = Some(Box::new(handler));
2485 }
2486
2487 pub fn query_child(&self, child_ref: &crate::reference::Reference) -> Option<IntrospectResult> {
2489 let guard = self.inner.query_child_handler.read().unwrap();
2490 guard.as_ref().map(|handler| handler(child_ref))
2491 }
2492
2493 pub fn is_system(&self) -> bool {
2495 self.inner.is_system.load(Ordering::Relaxed)
2496 }
2497
2498 pub fn store_terminated_snapshot(&self, payload: crate::introspect::IntrospectResult) {
2508 let snapshots = &self.inner.proc.inner.terminated_snapshots;
2509 snapshots.insert(self.actor_id().clone(), payload);
2510 let max = hyperactor_config::global::get(crate::config::TERMINATED_SNAPSHOT_RETENTION);
2511 let excess = snapshots.len().saturating_sub(max);
2512 if excess > 0 {
2513 let entries: Vec<_> = snapshots
2515 .iter()
2516 .map(|entry| {
2517 let occurred_at =
2518 serde_json::from_str::<hyperactor_config::Attrs>(&entry.value().attrs)
2519 .ok()
2520 .and_then(|attrs| {
2521 attrs
2523 .get(crate::introspect::FAILURE_ERROR_MESSAGE)
2524 .cloned()?;
2525 attrs
2527 .get(crate::introspect::FAILURE_OCCURRED_AT)
2528 .map(|t| humantime::format_rfc3339(*t).to_string())
2529 });
2530 (entry.key().clone(), occurred_at)
2531 })
2532 .collect();
2533
2534 for key in select_eviction_candidates(&entries, excess) {
2535 snapshots.remove(&key);
2536 }
2537 }
2538 }
2539
2540 pub(crate) fn bind<A: Actor, R: Binds<A>>(&self, ports: &Ports<A>) -> reference::ActorRef<R> {
2543 <R as Binds<A>>::bind(ports);
2544 ports.bind::<Signal>();
2556 ports.bind::<Undeliverable<MessageEnvelope>>();
2557 for entry in ports.bound.iter() {
2559 self.inner
2560 .exported_named_ports
2561 .insert(*entry.key(), entry.value());
2562 }
2563 reference::ActorRef::attest(self.actor_id().clone())
2564 }
2565
2566 pub(crate) fn downcast_handle<A: Actor>(&self) -> Option<ActorHandle<A>> {
2568 let ports = Arc::clone(&self.inner.ports).downcast::<Ports<A>>().ok()?;
2569 Some(ActorHandle::new(self.clone(), ports))
2570 }
2571
2572 pub fn traverse<F>(&self, f: &mut F)
2576 where
2577 F: FnMut(&InstanceCell, usize),
2578 {
2579 self.traverse_inner(0, f);
2580 }
2581
2582 fn traverse_inner<F>(&self, depth: usize, f: &mut F)
2583 where
2584 F: FnMut(&InstanceCell, usize),
2585 {
2586 f(self, depth);
2587 let mut children: Vec<_> = self.child_iter().map(|r| r.value().clone()).collect();
2589 children.sort_by_key(|c| c.pid());
2590 for child in children {
2591 child.traverse_inner(depth + 1, f);
2592 }
2593 }
2594}
2595
2596impl Drop for InstanceCellState {
2597 fn drop(&mut self) {
2598 if let Some(parent) = self.maybe_unlink_parent() {
2599 tracing::debug!(
2600 "instance {} was dropped with parent {} still linked",
2601 self.actor_id,
2602 parent.actor_id()
2603 );
2604 }
2605 if self.proc.inner.instances.remove(&self.actor_id).is_none() {
2606 tracing::error!("instance {} was dropped but not in proc", self.actor_id);
2607 }
2608 }
2609}
2610
2611#[derive(Debug, Clone)]
2614pub struct WeakInstanceCell {
2615 inner: Weak<InstanceCellState>,
2616}
2617
2618impl Default for WeakInstanceCell {
2619 fn default() -> Self {
2620 Self::new()
2621 }
2622}
2623
2624impl WeakInstanceCell {
2625 pub fn new() -> Self {
2627 Self { inner: Weak::new() }
2628 }
2629
2630 pub fn upgrade(&self) -> Option<InstanceCell> {
2632 self.inner.upgrade().map(InstanceCell::wrap)
2633 }
2634}
2635
2636pub struct Ports<A: Actor> {
2641 ports: DashMap<TypeId, Box<dyn Any + Send + Sync + 'static>>,
2642 bound: DashMap<u64, &'static str>,
2643 mailbox: Mailbox,
2644 workq: OrderedSender<WorkCell<A>>,
2645}
2646
2647impl<A: Actor> Ports<A> {
2648 fn new(mailbox: Mailbox, workq: OrderedSender<WorkCell<A>>) -> Self {
2649 Self {
2650 ports: DashMap::new(),
2651 bound: DashMap::new(),
2652 mailbox,
2653 workq,
2654 }
2655 }
2656
2657 pub(crate) fn get<M: Message>(&self) -> PortHandle<M>
2659 where
2660 A: Handler<M>,
2661 {
2662 let key = TypeId::of::<M>();
2663 match self.ports.entry(key) {
2664 Entry::Vacant(entry) => {
2665 assert_ne!(
2667 key,
2668 TypeId::of::<Signal>(),
2669 "cannot provision Signal port through `Ports::get`"
2670 );
2671 assert_ne!(
2672 key,
2673 TypeId::of::<IntrospectMessage>(),
2674 "cannot provision IntrospectMessage port through `Ports::get`"
2675 );
2676
2677 let type_info = TypeInfo::get_by_typeid(key);
2678 let workq = self.workq.clone();
2679 let actor_id = self.mailbox.actor_id().to_string();
2680 let port = self.mailbox.open_enqueue_port(move |headers, msg: M| {
2681 let seq_info = headers.get(SEQ_INFO);
2682
2683 let work = WorkCell::new(move |actor: &mut A, instance: &Instance<A>| {
2684 Box::pin(async move {
2685 unsafe {
2687 instance
2688 .handle_message(actor, type_info, headers, msg)
2689 .await
2690 }
2691 })
2692 });
2693 ACTOR_MESSAGE_QUEUE_SIZE.add(
2694 1,
2695 hyperactor_telemetry::kv_pairs!("actor_id" => actor_id.clone()),
2696 );
2697 if workq.enable_buffering {
2698 match seq_info {
2699 Some(SeqInfo::Session { session_id, seq }) => {
2700 workq.send(session_id, seq, work).map_err(|e| match e {
2703 OrderedSenderError::InvalidZeroSeq(_) => {
2704 let error_msg = format!(
2705 "in enqueue func for {}, got seq 0 for message type {}",
2706 actor_id,
2707 std::any::type_name::<M>(),
2708 );
2709 tracing::error!(error_msg);
2710 anyhow::anyhow!(error_msg)
2711 }
2712 OrderedSenderError::SendError(e) => anyhow::Error::from(e),
2713 OrderedSenderError::FlushError(e) => e,
2714 })
2715 }
2716 Some(SeqInfo::Direct) => {
2717 workq.direct_send(work).map_err(anyhow::Error::from)
2718 }
2719 None => {
2720 let error_msg = format!(
2721 "in enqueue func for {}, buffering is enabled, but SEQ_INFO is not set for message type {}",
2722 actor_id,
2723 std::any::type_name::<M>(),
2724 );
2725 tracing::error!(error_msg);
2726 anyhow::bail!(error_msg);
2727 }
2728 }
2729 } else {
2730 workq.direct_send(work).map_err(anyhow::Error::from)
2731 }
2732 });
2733 entry.insert(Box::new(port.clone()));
2734 port
2735 }
2736 Entry::Occupied(entry) => {
2737 let port = entry.get();
2738 port.downcast_ref::<PortHandle<M>>().unwrap().clone()
2739 }
2740 }
2741 }
2742
2743 pub(crate) fn open_message_port<M: Message>(&self) -> Option<(PortHandle<M>, PortReceiver<M>)> {
2746 match self.ports.entry(TypeId::of::<M>()) {
2747 Entry::Vacant(entry) => {
2748 let (port, receiver) = self.mailbox.open_port();
2749 entry.insert(Box::new(port.clone()));
2750 Some((port, receiver))
2751 }
2752 Entry::Occupied(_) => None,
2753 }
2754 }
2755
2756 pub fn bind<M: RemoteMessage>(&self)
2758 where
2759 A: Handler<M>,
2760 {
2761 let port_index = M::port();
2762 match self.bound.entry(port_index) {
2763 Entry::Vacant(entry) => {
2764 self.get::<M>().bind_actor_port();
2765 entry.insert(M::typename());
2766 }
2767 Entry::Occupied(entry) => {
2768 assert_eq!(
2769 *entry.get(),
2770 M::typename(),
2771 "bind {}: port index {} already bound to type {}",
2772 M::typename(),
2773 port_index,
2774 entry.get(),
2775 );
2776 }
2777 }
2778 }
2779}
2780
2781#[cfg(test)]
2782mod tests {
2783 use std::assert_matches::assert_matches;
2784 use std::sync::atomic::AtomicBool;
2785
2786 use hyperactor_macros::export;
2787 use serde_json::json;
2788 use timed_test::async_timed_test;
2789 use tokio::sync::Barrier;
2790 use tokio::sync::oneshot;
2791 use tracing::Level;
2792 use tracing_subscriber::layer::SubscriberExt;
2793 use tracing_test::internal::logs_with_scope_contain;
2794
2795 use super::*;
2796 use crate as hyperactor;
2798 use crate::HandleClient;
2799 use crate::Handler;
2800 use crate::testing::proc_supervison::ProcSupervisionCoordinator;
2801 use crate::testing::process_assertion::assert_termination;
2802
2803 #[derive(Debug, Default)]
2804 #[export]
2805 struct TestActor;
2806
2807 impl Actor for TestActor {}
2808
2809 #[derive(Handler, HandleClient, Debug)]
2810 enum TestActorMessage {
2811 Reply(oneshot::Sender<()>),
2812 Wait(oneshot::Sender<()>, oneshot::Receiver<()>),
2813 Forward(ActorHandle<TestActor>, Box<TestActorMessage>),
2814 Noop(),
2815 Fail(anyhow::Error),
2816 Panic(String),
2817 Spawn(oneshot::Sender<ActorHandle<TestActor>>),
2818 }
2819
2820 impl TestActor {
2821 async fn spawn_child(
2822 cx: &impl context::Actor,
2823 parent: &ActorHandle<TestActor>,
2824 ) -> ActorHandle<TestActor> {
2825 let (tx, rx) = oneshot::channel();
2826 parent.send(cx, TestActorMessage::Spawn(tx)).unwrap();
2827 rx.await.unwrap()
2828 }
2829 }
2830
2831 #[async_trait]
2832 #[crate::handle(TestActorMessage)]
2833 impl TestActorMessageHandler for TestActor {
2834 async fn reply(
2835 &mut self,
2836 _cx: &crate::Context<Self>,
2837 sender: oneshot::Sender<()>,
2838 ) -> Result<(), anyhow::Error> {
2839 sender.send(()).unwrap();
2840 Ok(())
2841 }
2842
2843 async fn wait(
2844 &mut self,
2845 _cx: &crate::Context<Self>,
2846 sender: oneshot::Sender<()>,
2847 receiver: oneshot::Receiver<()>,
2848 ) -> Result<(), anyhow::Error> {
2849 sender.send(()).unwrap();
2850 receiver.await.unwrap();
2851 Ok(())
2852 }
2853
2854 async fn forward(
2855 &mut self,
2856 cx: &crate::Context<Self>,
2857 destination: ActorHandle<TestActor>,
2858 message: Box<TestActorMessage>,
2859 ) -> Result<(), anyhow::Error> {
2860 destination.send(cx, *message)?;
2862 Ok(())
2863 }
2864
2865 async fn noop(&mut self, _cx: &crate::Context<Self>) -> Result<(), anyhow::Error> {
2866 Ok(())
2867 }
2868
2869 async fn fail(
2870 &mut self,
2871 _cx: &crate::Context<Self>,
2872 err: anyhow::Error,
2873 ) -> Result<(), anyhow::Error> {
2874 Err(err)
2875 }
2876
2877 async fn panic(
2878 &mut self,
2879 _cx: &crate::Context<Self>,
2880 err_msg: String,
2881 ) -> Result<(), anyhow::Error> {
2882 panic!("{}", err_msg);
2883 }
2884
2885 async fn spawn(
2886 &mut self,
2887 cx: &crate::Context<Self>,
2888 reply: oneshot::Sender<ActorHandle<TestActor>>,
2889 ) -> Result<(), anyhow::Error> {
2890 let handle = TestActor.spawn(cx)?;
2891 reply.send(handle).unwrap();
2892 Ok(())
2893 }
2894 }
2895
2896 #[tracing_test::traced_test]
2897 #[async_timed_test(timeout_secs = 30)]
2898 async fn test_spawn_actor() {
2899 let proc = Proc::local();
2900 let (client, _) = proc.instance("client").unwrap();
2901 let handle = proc.spawn("test", TestActor).unwrap();
2902
2903 assert!(logs_contain(
2905 format!(
2906 "{}: spawned with {:?}",
2907 handle.actor_id(),
2908 handle.cell().actor_task_handle().unwrap(),
2909 )
2910 .as_str()
2911 ));
2912
2913 let mut state = handle.status().clone();
2914
2915 let (tx, rx) = oneshot::channel::<()>();
2918 handle.send(&client, TestActorMessage::Reply(tx)).unwrap();
2919 rx.await.unwrap();
2920
2921 state
2922 .wait_for(|state: &ActorStatus| matches!(*state, ActorStatus::Idle))
2923 .await
2924 .unwrap();
2925
2926 let (enter_tx, enter_rx) = oneshot::channel::<()>();
2928 let (exit_tx, exit_rx) = oneshot::channel::<()>();
2929
2930 handle
2931 .send(&client, TestActorMessage::Wait(enter_tx, exit_rx))
2932 .unwrap();
2933 enter_rx.await.unwrap();
2934 assert_matches!(*state.borrow(), ActorStatus::Processing(instant, _) if instant <= std::time::SystemTime::now());
2935 exit_tx.send(()).unwrap();
2936
2937 state
2938 .wait_for(|state| matches!(*state, ActorStatus::Idle))
2939 .await
2940 .unwrap();
2941
2942 handle.drain_and_stop("test").unwrap();
2943 handle.await;
2944 assert_matches!(&*state.borrow(), ActorStatus::Stopped(reason) if reason == "test");
2945 }
2946
2947 #[async_timed_test(timeout_secs = 30)]
2948 async fn test_proc_actors_messaging() {
2949 let proc = Proc::local();
2950 let (client, _) = proc.instance("client").unwrap();
2951 let first = proc.spawn::<TestActor>("first", TestActor).unwrap();
2952 let second = proc.spawn::<TestActor>("second", TestActor).unwrap();
2953 let (tx, rx) = oneshot::channel::<()>();
2954 let reply_message = TestActorMessage::Reply(tx);
2955 first
2956 .send(
2957 &client,
2958 TestActorMessage::Forward(second, Box::new(reply_message)),
2959 )
2960 .unwrap();
2961 rx.await.unwrap();
2962 }
2963
2964 #[derive(Debug, Default)]
2965 #[export]
2966 struct LookupTestActor;
2967
2968 impl Actor for LookupTestActor {}
2969
2970 #[derive(Handler, HandleClient, Debug)]
2971 enum LookupTestMessage {
2972 ActorExists(
2973 reference::ActorRef<TestActor>,
2974 #[reply] reference::OncePortRef<bool>,
2975 ),
2976 }
2977
2978 #[async_trait]
2979 #[crate::handle(LookupTestMessage)]
2980 impl LookupTestMessageHandler for LookupTestActor {
2981 async fn actor_exists(
2982 &mut self,
2983 cx: &crate::Context<Self>,
2984 actor_ref: reference::ActorRef<TestActor>,
2985 ) -> Result<bool, anyhow::Error> {
2986 Ok(actor_ref.downcast_handle(cx).is_some())
2987 }
2988 }
2989
2990 #[async_timed_test(timeout_secs = 30)]
2991 async fn test_actor_lookup() {
2992 let proc = Proc::local();
2993 let (client, _handle) = proc.instance("client").unwrap();
2994
2995 let target_actor = proc.spawn::<TestActor>("target", TestActor).unwrap();
2996 let target_actor_ref = target_actor.bind();
2997 let lookup_actor = proc
2998 .spawn::<LookupTestActor>("lookup", LookupTestActor)
2999 .unwrap();
3000
3001 assert!(
3002 lookup_actor
3003 .actor_exists(&client, target_actor_ref.clone())
3004 .await
3005 .unwrap()
3006 );
3007
3008 assert!(
3010 !lookup_actor
3011 .actor_exists(
3012 &client,
3013 reference::ActorRef::attest(target_actor.actor_id().child_id(123).clone())
3014 )
3015 .await
3016 .unwrap()
3017 );
3018 assert!(
3020 !lookup_actor
3021 .actor_exists(
3022 &client,
3023 reference::ActorRef::attest(lookup_actor.actor_id().clone())
3024 )
3025 .await
3026 .unwrap()
3027 );
3028
3029 target_actor.drain_and_stop("test").unwrap();
3030 target_actor.await;
3031
3032 assert!(
3033 !lookup_actor
3034 .actor_exists(&client, target_actor_ref)
3035 .await
3036 .unwrap()
3037 );
3038
3039 lookup_actor.drain_and_stop("test").unwrap();
3040 lookup_actor.await;
3041 }
3042
3043 fn validate_link(child: &InstanceCell, parent: &InstanceCell) {
3044 assert_eq!(child.actor_id().proc_id(), parent.actor_id().proc_id());
3045 assert_eq!(
3046 child.inner.parent.upgrade().unwrap().actor_id(),
3047 parent.actor_id()
3048 );
3049 assert_matches!(
3050 parent.inner.children.get(&child.pid()),
3051 Some(node) if node.actor_id() == child.actor_id()
3052 );
3053 }
3054
3055 #[tracing_test::traced_test]
3056 #[async_timed_test(timeout_secs = 30)]
3057 async fn test_spawn_child() {
3058 let proc = Proc::local();
3059 let (client, _) = proc.instance("client").unwrap();
3060
3061 let first = proc.spawn::<TestActor>("first", TestActor).unwrap();
3062 let second = TestActor::spawn_child(&client, &first).await;
3063 let third = TestActor::spawn_child(&client, &second).await;
3064
3065 assert!(logs_with_scope_contain(
3067 "hyperactor::proc",
3068 format!(
3069 "{}: spawned with {:?}",
3070 first.actor_id(),
3071 first.cell().actor_task_handle().unwrap()
3072 )
3073 .as_str()
3074 ));
3075 assert!(logs_with_scope_contain(
3076 "hyperactor::proc",
3077 format!(
3078 "{}: spawned with {:?}",
3079 second.actor_id(),
3080 second.cell().actor_task_handle().unwrap()
3081 )
3082 .as_str()
3083 ));
3084 assert!(logs_with_scope_contain(
3085 "hyperactor::proc",
3086 format!(
3087 "{}: spawned with {:?}",
3088 third.actor_id(),
3089 third.cell().actor_task_handle().unwrap()
3090 )
3091 .as_str()
3092 ));
3093
3094 assert_eq!(first.actor_id().proc_id(), proc.proc_id());
3096 assert_eq!(second.actor_id(), &first.actor_id().child_id(1));
3097 assert_eq!(third.actor_id(), &first.actor_id().child_id(2));
3098
3099 validate_link(third.cell(), second.cell());
3101 validate_link(second.cell(), first.cell());
3102 assert!(first.cell().inner.parent.upgrade().is_none());
3103
3104 let third_cell = third.cell().clone();
3107 third.drain_and_stop("test").unwrap();
3108 third.await;
3109 assert!(third_cell.inner.children.is_empty());
3110 drop(third_cell);
3111 validate_link(second.cell(), first.cell());
3112
3113 let second_cell = second.cell().clone();
3114 second.drain_and_stop("test").unwrap();
3115 second.await;
3116 assert!(second_cell.inner.children.is_empty());
3117 drop(second_cell);
3118
3119 let first_cell = first.cell().clone();
3120 first.drain_and_stop("test").unwrap();
3121 first.await;
3122 assert!(first_cell.inner.children.is_empty());
3123 }
3124
3125 #[async_timed_test(timeout_secs = 30)]
3126 async fn test_child_lifecycle() {
3127 let proc = Proc::local();
3128 let (client, _) = proc.instance("client").unwrap();
3129
3130 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
3131 let root_1 = TestActor::spawn_child(&client, &root).await;
3132 let root_2 = TestActor::spawn_child(&client, &root).await;
3133 let root_2_1 = TestActor::spawn_child(&client, &root_2).await;
3134
3135 root.drain_and_stop("test").unwrap();
3136 root.await;
3137
3138 for actor in [root_1, root_2, root_2_1] {
3139 assert!(actor.send(&client, TestActorMessage::Noop()).is_err());
3140 assert_matches!(actor.await, ActorStatus::Stopped(reason) if reason == "parent stopping");
3141 }
3142 }
3143
3144 #[async_timed_test(timeout_secs = 30)]
3145 async fn test_parent_failure() {
3146 let proc = Proc::local();
3147 let (client, _) = proc.instance("client").unwrap();
3148 let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
3151
3152 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
3153 let root_1 = TestActor::spawn_child(&client, &root).await;
3154 let root_2 = TestActor::spawn_child(&client, &root).await;
3155 let root_2_1 = TestActor::spawn_child(&client, &root_2).await;
3156
3157 root_2
3158 .send(
3159 &client,
3160 TestActorMessage::Fail(anyhow::anyhow!("some random failure")),
3161 )
3162 .unwrap();
3163 let _root_2_actor_id = root_2.actor_id().clone();
3164 assert_matches!(
3165 root_2.await,
3166 ActorStatus::Failed(err) if err.to_string() == "some random failure"
3167 );
3168
3169 assert_matches!(
3173 root.await,
3174 ActorStatus::Failed(err) if err.to_string().contains("some random failure")
3175 );
3176 assert_matches!(root_2_1.await, ActorStatus::Stopped(_));
3177 assert_matches!(root_1.await, ActorStatus::Stopped(_));
3178 }
3179
3180 #[async_timed_test(timeout_secs = 30)]
3181 async fn test_multi_handler() {
3182 #[derive(Debug)]
3186 struct TestActor(Arc<AtomicUsize>);
3187
3188 #[async_trait]
3189 impl Actor for TestActor {}
3190
3191 #[async_trait]
3192 impl Handler<OncePortHandle<PortHandle<usize>>> for TestActor {
3193 async fn handle(
3194 &mut self,
3195 cx: &crate::Context<Self>,
3196 message: OncePortHandle<PortHandle<usize>>,
3197 ) -> anyhow::Result<()> {
3198 message.send(cx, cx.port())?;
3199 Ok(())
3200 }
3201 }
3202
3203 #[async_trait]
3204 impl Handler<usize> for TestActor {
3205 async fn handle(
3206 &mut self,
3207 _cx: &crate::Context<Self>,
3208 message: usize,
3209 ) -> anyhow::Result<()> {
3210 self.0.fetch_add(message, Ordering::SeqCst);
3211 Ok(())
3212 }
3213 }
3214
3215 let proc = Proc::local();
3216 let state = Arc::new(AtomicUsize::new(0));
3217 let actor = TestActor(state.clone());
3218 let handle = proc.spawn::<TestActor>("test", actor).unwrap();
3219 let (client, _) = proc.instance("client").unwrap();
3220 let (tx, rx) = client.open_once_port();
3221 handle.send(&client, tx).unwrap();
3222 let usize_handle = rx.recv().await.unwrap();
3223 usize_handle.send(&client, 123).unwrap();
3224
3225 handle.drain_and_stop("test").unwrap();
3226 handle.await;
3227
3228 assert_eq!(state.load(Ordering::SeqCst), 123);
3229 }
3230
3231 #[async_timed_test(timeout_secs = 30)]
3232 async fn test_actor_panic() {
3233 panic_handler::set_panic_hook();
3235
3236 let proc = Proc::local();
3237 let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
3240
3241 let (client, _handle) = proc.instance("client").unwrap();
3242 let actor_handle = proc.spawn("test", TestActor).unwrap();
3243 actor_handle
3244 .panic(&client, "some random failure".to_string())
3245 .await
3246 .unwrap();
3247 let actor_status = actor_handle.await;
3248
3249 assert_matches!(actor_status, ActorStatus::Failed(_));
3253 if let ActorStatus::Failed(err) = actor_status {
3254 let error_msg = err.to_string();
3255 assert!(error_msg.contains("some random failure"));
3257 assert!(error_msg.contains("library/std/src/panicking.rs"));
3261 }
3262 }
3263
3264 #[async_timed_test(timeout_secs = 30)]
3265 async fn test_local_supervision_propagation() {
3266 hyperactor_telemetry::initialize_logging_for_test();
3267
3268 #[derive(Debug)]
3269 struct TestActor(Arc<AtomicBool>, bool);
3270
3271 #[async_trait]
3272 impl Actor for TestActor {
3273 async fn handle_supervision_event(
3274 &mut self,
3275 _this: &Instance<Self>,
3276 _event: &ActorSupervisionEvent,
3277 ) -> Result<bool, anyhow::Error> {
3278 if !self.1 {
3279 return Ok(false);
3280 }
3281
3282 tracing::error!(
3283 "{}: supervision event received: {:?}",
3284 _this.self_id(),
3285 _event
3286 );
3287 self.0.store(true, Ordering::SeqCst);
3288 Ok(true)
3289 }
3290 }
3291
3292 #[async_trait]
3293 impl Handler<String> for TestActor {
3294 async fn handle(
3295 &mut self,
3296 cx: &crate::Context<Self>,
3297 message: String,
3298 ) -> anyhow::Result<()> {
3299 tracing::info!("{} received message: {}", cx.self_id(), message);
3300 Err(anyhow::anyhow!(message))
3301 }
3302 }
3303
3304 let proc = Proc::local();
3305 let (client, _) = proc.instance("client").unwrap();
3306 let (reported_event, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
3307
3308 let root_state = Arc::new(AtomicBool::new(false));
3309 let root_1_state = Arc::new(AtomicBool::new(false));
3310 let root_1_1_state = Arc::new(AtomicBool::new(false));
3311 let root_1_1_1_state = Arc::new(AtomicBool::new(false));
3312 let root_2_state = Arc::new(AtomicBool::new(false));
3313 let root_2_1_state = Arc::new(AtomicBool::new(false));
3314
3315 let root = proc
3316 .spawn::<TestActor>("root", TestActor(root_state.clone(), false))
3317 .unwrap();
3318 let root_1 = proc
3319 .spawn_child::<TestActor>(
3320 root.cell().clone(),
3321 TestActor(
3322 root_1_state.clone(),
3323 true, ),
3325 )
3326 .unwrap();
3327 let root_1_1 = proc
3328 .spawn_child::<TestActor>(
3329 root_1.cell().clone(),
3330 TestActor(root_1_1_state.clone(), false),
3331 )
3332 .unwrap();
3333 let root_1_1_1 = proc
3334 .spawn_child::<TestActor>(
3335 root_1_1.cell().clone(),
3336 TestActor(root_1_1_1_state.clone(), false),
3337 )
3338 .unwrap();
3339 let root_2 = proc
3340 .spawn_child::<TestActor>(root.cell().clone(), TestActor(root_2_state.clone(), false))
3341 .unwrap();
3342 let root_2_1 = proc
3343 .spawn_child::<TestActor>(
3344 root_2.cell().clone(),
3345 TestActor(root_2_1_state.clone(), false),
3346 )
3347 .unwrap();
3348
3349 root_1_1_1
3352 .send::<String>(&client, "some random failure".into())
3353 .unwrap();
3354
3355 root_2_1
3358 .send::<String>(&client, "some random failure".into())
3359 .unwrap();
3360
3361 tokio::time::sleep(Duration::from_secs(1)).await;
3362
3363 assert!(!root_state.load(Ordering::SeqCst));
3364 assert!(root_1_state.load(Ordering::SeqCst));
3365 assert!(!root_1_1_state.load(Ordering::SeqCst));
3366 assert!(!root_1_1_1_state.load(Ordering::SeqCst));
3367 assert!(!root_2_state.load(Ordering::SeqCst));
3368 assert!(!root_2_1_state.load(Ordering::SeqCst));
3369 assert_eq!(
3370 reported_event.event().map(|e| e.actor_id.clone()),
3371 Some(root_2_1.actor_id().clone())
3372 );
3373 }
3374
3375 #[async_timed_test(timeout_secs = 30)]
3376 async fn test_instance() {
3377 #[derive(Debug, Default)]
3378 struct TestActor;
3379
3380 impl Actor for TestActor {}
3381
3382 #[async_trait]
3383 impl Handler<(String, reference::PortRef<String>)> for TestActor {
3384 async fn handle(
3385 &mut self,
3386 cx: &crate::Context<Self>,
3387 (message, port): (String, reference::PortRef<String>),
3388 ) -> anyhow::Result<()> {
3389 port.send(cx, message)?;
3390 Ok(())
3391 }
3392 }
3393
3394 let proc = Proc::local();
3395
3396 let (instance, handle) = proc.instance("my_test_actor").unwrap();
3397
3398 let child_actor = TestActor.spawn(&instance).unwrap();
3399
3400 let (port, mut receiver) = instance.open_port();
3401 child_actor
3402 .send(&instance, ("hello".to_string(), port.bind()))
3403 .unwrap();
3404
3405 let message = receiver.recv().await.unwrap();
3406 assert_eq!(message, "hello");
3407
3408 child_actor.drain_and_stop("test").unwrap();
3409 child_actor.await;
3410
3411 assert_eq!(*handle.status().borrow(), ActorStatus::Client);
3412 drop(instance);
3413 assert_matches!(*handle.status().borrow(), ActorStatus::Stopped(_));
3414 handle.await;
3415 }
3416
3417 #[tokio::test]
3418 async fn test_proc_terminate_without_coordinator() {
3419 if std::env::var("CARGO_TEST").is_ok() {
3420 eprintln!("test skipped as it hangs when run by cargo in sandcastle");
3421 return;
3422 }
3423
3424 let process = async {
3425 let proc = Proc::local();
3426 let root = proc.spawn("root", TestActor).unwrap();
3430 let (client, _handle) = proc.instance("client").unwrap();
3431 root.fail(&client, anyhow::anyhow!("some random failure"))
3432 .await
3433 .unwrap();
3434 tokio::time::sleep(Duration::from_secs(30)).await;
3438 };
3439
3440 assert_termination(|| process, 1).await.unwrap();
3441 }
3442
3443 fn trace_and_block(fut: impl Future) {
3444 tracing::subscriber::with_default(
3445 tracing_subscriber::Registry::default().with(hyperactor_telemetry::recorder().layer()),
3446 || {
3447 tokio::runtime::Builder::new_current_thread()
3448 .enable_all()
3449 .build()
3450 .unwrap()
3451 .block_on(fut)
3452 },
3453 );
3454 }
3455
3456 #[ignore = "until trace recording is turned back on"]
3457 #[test]
3458 fn test_handler_logging() {
3459 #[derive(Debug, Default)]
3460 struct LoggingActor;
3461
3462 impl Actor for LoggingActor {}
3463
3464 impl LoggingActor {
3465 async fn wait(cx: &impl context::Actor, handle: &ActorHandle<Self>) {
3466 let barrier = Arc::new(Barrier::new(2));
3467 handle.send(cx, barrier.clone()).unwrap();
3468 barrier.wait().await;
3469 }
3470 }
3471
3472 #[async_trait]
3473 impl Handler<String> for LoggingActor {
3474 async fn handle(
3475 &mut self,
3476 _cx: &crate::Context<Self>,
3477 message: String,
3478 ) -> anyhow::Result<()> {
3479 tracing::info!("{}", message);
3480 Ok(())
3481 }
3482 }
3483
3484 #[async_trait]
3485 impl Handler<u64> for LoggingActor {
3486 async fn handle(
3487 &mut self,
3488 _cx: &crate::Context<Self>,
3489 message: u64,
3490 ) -> anyhow::Result<()> {
3491 tracing::event!(Level::INFO, number = message);
3492 Ok(())
3493 }
3494 }
3495
3496 #[async_trait]
3497 impl Handler<Arc<Barrier>> for LoggingActor {
3498 async fn handle(
3499 &mut self,
3500 _cx: &crate::Context<Self>,
3501 message: Arc<Barrier>,
3502 ) -> anyhow::Result<()> {
3503 message.wait().await;
3504 Ok(())
3505 }
3506 }
3507
3508 #[async_trait]
3509 impl Handler<Arc<(Barrier, Barrier)>> for LoggingActor {
3510 async fn handle(
3511 &mut self,
3512 _cx: &crate::Context<Self>,
3513 barriers: Arc<(Barrier, Barrier)>,
3514 ) -> anyhow::Result<()> {
3515 let inner = tracing::span!(Level::INFO, "child_span");
3516 let _inner_guard = inner.enter();
3517 barriers.0.wait().await;
3518 barriers.1.wait().await;
3519 Ok(())
3520 }
3521 }
3522
3523 trace_and_block(async {
3524 let proc = Proc::local();
3525 let (client, _) = proc.instance("client").unwrap();
3526 let handle = LoggingActor.spawn_detached().unwrap();
3527 handle.send(&client, "hello world".to_string()).unwrap();
3528 handle
3529 .send(&client, "hello world again".to_string())
3530 .unwrap();
3531 handle.send(&client, 123u64).unwrap();
3532
3533 LoggingActor::wait(&client, &handle).await;
3534
3535 let events = handle.cell().inner.recording.tail();
3536 assert_eq!(events.len(), 3);
3537 assert_eq!(events[0].json_value(), json!({ "message": "hello world" }));
3538 assert_eq!(
3539 events[1].json_value(),
3540 json!({ "message": "hello world again" })
3541 );
3542 assert_eq!(events[2].json_value(), json!({ "number": 123 }));
3543
3544 let stacks = {
3545 let barriers = Arc::new((Barrier::new(2), Barrier::new(2)));
3546 handle.send(&client, Arc::clone(&barriers)).unwrap();
3547 barriers.0.wait().await;
3548 let stacks = handle.cell().inner.recording.stacks();
3549 barriers.1.wait().await;
3550 stacks
3551 };
3552 assert_eq!(stacks.len(), 1);
3553 assert_eq!(stacks[0].len(), 1);
3554 assert_eq!(stacks[0][0].name(), "child_span");
3555 })
3556 }
3557
3558 #[async_timed_test(timeout_secs = 30)]
3559 async fn test_mailbox_closed_with_owner_stopped_reason() {
3560 use crate::actor::ActorStatus;
3561 use crate::mailbox::MailboxErrorKind;
3562 use crate::mailbox::MailboxSenderErrorKind;
3563
3564 let proc = Proc::local();
3565 let (client, _) = proc.instance("client").unwrap();
3566 let actor_handle = proc.spawn("test", TestActor).unwrap();
3567
3568 let handle_for_send = actor_handle.clone();
3570
3571 actor_handle.drain_and_stop("healthy shutdown").unwrap();
3573 actor_handle.await;
3574
3575 let result = handle_for_send.send(&client, TestActorMessage::Noop());
3577
3578 assert!(result.is_err(), "send should fail when actor is stopped");
3579 let err = result.unwrap_err();
3580 assert_matches!(
3581 err.kind(),
3582 MailboxSenderErrorKind::Mailbox(mailbox_err)
3583 if matches!(
3584 mailbox_err.kind(),
3585 MailboxErrorKind::OwnerTerminated(ActorStatus::Stopped(reason)) if reason == "healthy shutdown"
3586 )
3587 );
3588 }
3589
3590 #[async_timed_test(timeout_secs = 30)]
3591 async fn test_mailbox_closed_with_owner_failed_reason() {
3592 use crate::actor::ActorErrorKind;
3593 use crate::actor::ActorStatus;
3594 use crate::mailbox::MailboxErrorKind;
3595 use crate::mailbox::MailboxSenderErrorKind;
3596
3597 let proc = Proc::local();
3598 let (client, _) = proc.instance("client").unwrap();
3599 let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
3602
3603 let actor_handle = proc.spawn("test", TestActor).unwrap();
3604
3605 let handle_for_send = actor_handle.clone();
3607
3608 actor_handle
3610 .send(
3611 &client,
3612 TestActorMessage::Fail(anyhow::anyhow!("intentional failure")),
3613 )
3614 .unwrap();
3615 actor_handle.await;
3616
3617 let result = handle_for_send.send(&client, TestActorMessage::Noop());
3619
3620 assert!(result.is_err(), "send should fail when actor has failed");
3621 let err = result.unwrap_err();
3622 assert_matches!(
3623 err.kind(),
3624 MailboxSenderErrorKind::Mailbox(mailbox_err)
3625 if matches!(
3626 mailbox_err.kind(),
3627 MailboxErrorKind::OwnerTerminated(ActorStatus::Failed(ActorErrorKind::Generic(msg)))
3628 if msg.contains("intentional failure")
3629 )
3630 );
3631 }
3632
3633 async fn wait_for_terminated_snapshot(
3637 proc: &Proc,
3638 actor_id: &reference::ActorId,
3639 ) -> crate::introspect::IntrospectResult {
3640 for i in 0..1000 {
3644 if let Some(snapshot) = proc.terminated_snapshot(actor_id) {
3645 return snapshot;
3646 }
3647 if i < 50 {
3648 tokio::task::yield_now().await;
3649 } else {
3650 tokio::time::sleep(Duration::from_millis(50)).await;
3651 }
3652 }
3653 panic!("timed out waiting for terminated snapshot for {}", actor_id);
3654 }
3655
3656 #[async_timed_test(timeout_secs = 30)]
3665 async fn test_terminated_snapshot_stored_on_stop() {
3666 let proc = Proc::local();
3667 let (_client, _client_handle) = proc.instance("client").unwrap();
3668
3669 let handle = proc.spawn::<TestActor>("actor", TestActor).unwrap();
3670 let actor_id = handle.actor_id().clone();
3671
3672 assert!(proc.terminated_snapshot(&actor_id).is_none());
3674 assert!(!proc.all_terminated_actor_ids().contains(&actor_id));
3675
3676 handle.drain_and_stop("test").unwrap();
3678 handle.await;
3679
3680 let snapshot = wait_for_terminated_snapshot(&proc, &actor_id).await;
3683 let attrs: hyperactor_config::Attrs =
3684 serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
3685 let status = attrs
3686 .get(crate::introspect::STATUS)
3687 .expect("must have status");
3688 assert!(
3689 status.starts_with("stopped"),
3690 "expected stopped status, got: {}",
3691 status
3692 );
3693
3694 assert!(proc.all_terminated_actor_ids().contains(&actor_id));
3696 assert!(
3697 !proc.all_actor_ids().contains(&actor_id),
3698 "stopped actor should not appear in live actor IDs"
3699 );
3700 }
3701
3702 #[async_timed_test(timeout_secs = 30)]
3709 async fn test_terminated_snapshot_stored_on_failure() {
3710 let proc = Proc::local();
3711 let (client, _client_handle) = proc.instance("client").unwrap();
3712 ProcSupervisionCoordinator::set(&proc).await.unwrap();
3714
3715 let handle = proc.spawn::<TestActor>("fail_actor", TestActor).unwrap();
3716 let actor_id = handle.actor_id().clone();
3717
3718 handle
3720 .send(&client, TestActorMessage::Fail(anyhow::anyhow!("boom")))
3721 .unwrap();
3722 handle.await;
3723
3724 let snapshot = wait_for_terminated_snapshot(&proc, &actor_id).await;
3725 let attrs: hyperactor_config::Attrs =
3726 serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
3727 let status = attrs
3728 .get(crate::introspect::STATUS)
3729 .expect("must have status");
3730 assert!(
3731 status.starts_with("failed"),
3732 "expected failed status, got: {}",
3733 status
3734 );
3735 }
3736
3737 #[async_timed_test(timeout_secs = 30)]
3739 async fn test_supervision_event_stored_on_failure() {
3740 let proc = Proc::local();
3741 let (client, _client_handle) = proc.instance("client").unwrap();
3742 ProcSupervisionCoordinator::set(&proc).await.unwrap();
3743
3744 let handle = proc.spawn::<TestActor>("fail_actor", TestActor).unwrap();
3745 let actor_id = handle.actor_id().clone();
3746 let cell = handle.cell().clone();
3747
3748 handle
3749 .send(&client, TestActorMessage::Fail(anyhow::anyhow!("boom")))
3750 .unwrap();
3751 handle.await;
3752
3753 let event = cell.supervision_event();
3754 assert!(event.is_some(), "failed actor must have supervision_event");
3755 let event = event.unwrap();
3756 assert_eq!(event.actor_id, actor_id);
3757 assert!(event.actor_status.is_failed());
3758 assert_eq!(event.actually_failing_actor().actor_id, actor_id);
3760 }
3761
3762 #[async_timed_test(timeout_secs = 30)]
3764 async fn test_supervision_event_none_on_clean_stop() {
3765 let proc = Proc::local();
3766 let (_client, _client_handle) = proc.instance("client").unwrap();
3767
3768 let handle = proc.spawn::<TestActor>("stop_actor", TestActor).unwrap();
3769 let cell = handle.cell().clone();
3770
3771 handle.drain_and_stop("test").unwrap();
3772 handle.await;
3773
3774 assert!(
3775 cell.supervision_event().is_none(),
3776 "cleanly stopped actor must have no supervision_event"
3777 );
3778 }
3779
3780 #[async_timed_test(timeout_secs = 30)]
3782 async fn test_supervision_event_on_propagated_failure() {
3783 let proc = Proc::local();
3784 let (client, _client_handle) = proc.instance("client").unwrap();
3785 ProcSupervisionCoordinator::set(&proc).await.unwrap();
3786
3787 let parent = proc.spawn::<TestActor>("parent", TestActor).unwrap();
3788 let parent_cell = parent.cell().clone();
3789 let (tx, rx) = oneshot::channel();
3791 parent.send(&client, TestActorMessage::Spawn(tx)).unwrap();
3792 let child = rx.await.unwrap();
3793 let child_id = child.actor_id().clone();
3794
3795 child
3798 .send(
3799 &client,
3800 TestActorMessage::Fail(anyhow::anyhow!("child boom")),
3801 )
3802 .unwrap();
3803 parent.await;
3804
3805 let event = parent_cell.supervision_event();
3806 assert!(
3807 event.is_some(),
3808 "parent must have supervision_event from propagated failure"
3809 );
3810 let event = event.unwrap();
3811 assert_eq!(event.actually_failing_actor().actor_id, child_id);
3813 }
3814
3815 #[async_timed_test(timeout_secs = 30)]
3824 async fn test_resolve_actor_ref_none_for_terminal_actor() {
3825 let proc = Proc::local();
3826 let (_client, _client_handle) = proc.instance("client").unwrap();
3827
3828 let handle = proc.spawn::<TestActor>("target", TestActor).unwrap();
3829 let actor_ref: reference::ActorRef<TestActor> = handle.bind();
3830
3831 assert!(
3833 proc.resolve_actor_ref(&actor_ref).is_some(),
3834 "live actor should be resolvable"
3835 );
3836
3837 handle.drain_and_stop("test").unwrap();
3838 handle.await;
3839
3840 assert!(
3843 proc.resolve_actor_ref(&actor_ref).is_none(),
3844 "terminal actor must not be resolvable"
3845 );
3846 }
3847
3848 #[async_timed_test(timeout_secs = 30)]
3850 async fn test_terminated_snapshot_has_failure_info() {
3851 let proc = Proc::local();
3852 let (client, _client_handle) = proc.instance("client").unwrap();
3853 ProcSupervisionCoordinator::set(&proc).await.unwrap();
3854
3855 let handle = proc.spawn::<TestActor>("fail_actor", TestActor).unwrap();
3856 let actor_id = handle.actor_id().clone();
3857
3858 handle
3859 .send(&client, TestActorMessage::Fail(anyhow::anyhow!("kaboom")))
3860 .unwrap();
3861 handle.await;
3862
3863 let snapshot = wait_for_terminated_snapshot(&proc, &actor_id).await;
3864 let attrs: hyperactor_config::Attrs =
3865 serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
3866 let status = attrs
3867 .get(crate::introspect::STATUS)
3868 .expect("must have status");
3869 assert!(
3870 status.starts_with("failed"),
3871 "expected failed status, got: {}",
3872 status
3873 );
3874 let err_msg = attrs
3875 .get(crate::introspect::FAILURE_ERROR_MESSAGE)
3876 .expect("failed actor must have failure_error_message");
3877 assert!(!err_msg.is_empty());
3878 let root_cause = attrs
3879 .get(crate::introspect::FAILURE_ROOT_CAUSE_ACTOR)
3880 .expect("must have root_cause_actor");
3881 assert_eq!(root_cause, &actor_id.to_string());
3882 assert_eq!(
3883 attrs.get(crate::introspect::FAILURE_IS_PROPAGATED),
3884 Some(&false)
3885 );
3886 assert!(
3887 attrs.get(crate::introspect::FAILURE_OCCURRED_AT).is_some(),
3888 "failed actor must have occurred_at"
3889 );
3890 }
3891
3892 #[async_timed_test(timeout_secs = 30)]
3894 async fn test_propagated_failure_info() {
3895 let proc = Proc::local();
3896 let (client, _client_handle) = proc.instance("client").unwrap();
3897 ProcSupervisionCoordinator::set(&proc).await.unwrap();
3898
3899 let parent = proc.spawn::<TestActor>("parent", TestActor).unwrap();
3900 let parent_id = parent.actor_id().clone();
3901
3902 let (tx, rx) = oneshot::channel();
3903 parent.send(&client, TestActorMessage::Spawn(tx)).unwrap();
3904 let child = rx.await.unwrap();
3905 let child_id = child.actor_id().clone();
3906
3907 child
3908 .send(
3909 &client,
3910 TestActorMessage::Fail(anyhow::anyhow!("child fail")),
3911 )
3912 .unwrap();
3913 parent.await;
3914
3915 let snapshot = wait_for_terminated_snapshot(&proc, &parent_id).await;
3916 let attrs: hyperactor_config::Attrs =
3917 serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
3918 let root_cause = attrs
3919 .get(crate::introspect::FAILURE_ROOT_CAUSE_ACTOR)
3920 .expect("propagated failure must have root_cause_actor");
3921 assert_eq!(root_cause, &child_id.to_string());
3922 assert_eq!(
3923 attrs.get(crate::introspect::FAILURE_IS_PROPAGATED),
3924 Some(&true)
3925 );
3926 }
3927
3928 #[async_timed_test(timeout_secs = 30)]
3930 async fn test_spawn_with_name_creates_descriptive_name() {
3931 let proc = Proc::local();
3932 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
3933 let handle = proc
3934 .spawn_named_child(root.cell().clone(), "my_controller", TestActor)
3935 .unwrap();
3936 assert_eq!(handle.actor_id().name(), "my_controller");
3937 assert_eq!(handle.actor_id().pid(), 1);
3938 }
3939
3940 #[async_timed_test(timeout_secs = 30)]
3942 async fn test_spawn_with_name_increments_index() {
3943 let proc = Proc::local();
3944 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
3945 let first = proc
3946 .spawn_named_child(root.cell().clone(), "my_controller", TestActor)
3947 .unwrap();
3948 let second = proc
3949 .spawn_named_child(root.cell().clone(), "my_controller", TestActor)
3950 .unwrap();
3951 assert_eq!(first.actor_id().pid(), 1);
3952 assert_eq!(second.actor_id().pid(), 2);
3953 }
3954
3955 #[async_timed_test(timeout_secs = 30)]
3958 async fn test_spawn_with_name_preserves_supervision() {
3959 let proc = Proc::local();
3960 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
3961 let child = proc
3962 .spawn_named_child(root.cell().clone(), "supervised_child", TestActor)
3963 .unwrap();
3964 let child_cell = child.cell();
3965 let parent = child_cell.parent().expect("named child must have a parent");
3966 assert_eq!(parent.actor_id(), root.actor_id());
3967 }
3968
3969 #[async_timed_test(timeout_secs = 30)]
3971 async fn test_spawn_unchanged() {
3972 let proc = Proc::local();
3973 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
3974 let child = proc.spawn_child(root.cell().clone(), TestActor).unwrap();
3975 assert_eq!(child.actor_id().name(), root.actor_id().name());
3976 }
3977
3978 #[async_timed_test(timeout_secs = 30)]
3980 async fn test_spawn_with_name_different_names_different_pids() {
3981 let proc = Proc::local();
3982 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
3983 let a = proc
3984 .spawn_named_child(root.cell().clone(), "controller_a", TestActor)
3985 .unwrap();
3986 let b = proc
3987 .spawn_named_child(root.cell().clone(), "controller_b", TestActor)
3988 .unwrap();
3989 assert_ne!(a.actor_id().pid(), b.actor_id().pid());
3990 assert_eq!(a.actor_id().name(), "controller_a");
3991 assert_eq!(b.actor_id().name(), "controller_b");
3992 }
3993
3994 #[async_timed_test(timeout_secs = 30)]
3996 async fn test_spawn_with_name_no_child_overwrite() {
3997 let proc = Proc::local();
3998 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
3999 let _a = proc
4000 .spawn_named_child(root.cell().clone(), "ctrl", TestActor)
4001 .unwrap();
4002 let _b = proc
4003 .spawn_named_child(root.cell().clone(), "ctrl", TestActor)
4004 .unwrap();
4005 let _c = proc.spawn_child(root.cell().clone(), TestActor).unwrap();
4006 assert_eq!(root.cell().child_count(), 3);
4007 }
4008
4009 #[async_timed_test(timeout_secs = 30)]
4011 async fn test_spawn_with_name_does_not_pollute_roots() {
4012 let proc = Proc::local();
4013 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
4014 let _child = proc
4015 .spawn_named_child(root.cell().clone(), "foo", TestActor)
4016 .unwrap();
4017 let result = proc.spawn::<TestActor>("foo", TestActor);
4020 assert!(result.is_ok(), "named child should not pollute roots");
4021 }
4022
4023 #[async_timed_test(timeout_secs = 30)]
4025 async fn test_ai3_controller_actor_ids_unique_across_parents_same_proc() {
4026 let proc = Proc::local();
4027 let parent_a = proc.spawn::<TestActor>("parent_a", TestActor).unwrap();
4028 let parent_b = proc.spawn::<TestActor>("parent_b", TestActor).unwrap();
4029
4030 let ctrl_a = proc
4032 .spawn_named_child(parent_a.cell().clone(), "controller_mesh_a", TestActor)
4033 .unwrap();
4034 let ctrl_b = proc
4035 .spawn_named_child(parent_b.cell().clone(), "controller_mesh_b", TestActor)
4036 .unwrap();
4037
4038 assert_ne!(
4039 ctrl_a.actor_id(),
4040 ctrl_b.actor_id(),
4041 "controller ActorIds must be unique across parents"
4042 );
4043 }
4044
4045 #[async_timed_test(timeout_secs = 30)]
4047 async fn test_ai3_no_controller_overwrite_in_parent_or_proc_maps() {
4048 let proc = Proc::local();
4049 let parent_a = proc.spawn::<TestActor>("parent_a", TestActor).unwrap();
4050 let parent_b = proc.spawn::<TestActor>("parent_b", TestActor).unwrap();
4051
4052 let ctrl_a = proc
4053 .spawn_named_child(parent_a.cell().clone(), "controller_mesh_a", TestActor)
4054 .unwrap();
4055 let ctrl_b = proc
4056 .spawn_named_child(parent_b.cell().clone(), "controller_mesh_b", TestActor)
4057 .unwrap();
4058
4059 assert!(
4061 proc.get_instance(ctrl_a.actor_id()).is_some(),
4062 "ctrl_a must be resolvable"
4063 );
4064 assert!(
4065 proc.get_instance(ctrl_b.actor_id()).is_some(),
4066 "ctrl_b must be resolvable"
4067 );
4068 assert_eq!(parent_a.cell().child_count(), 1);
4070 assert_eq!(parent_b.cell().child_count(), 1);
4071 }
4072
4073 #[async_timed_test(timeout_secs = 30)]
4075 async fn test_stopped_snapshot_has_no_failure_info() {
4076 let proc = Proc::local();
4077 let (_client, _client_handle) = proc.instance("client").unwrap();
4078
4079 let handle = proc.spawn::<TestActor>("stop_actor", TestActor).unwrap();
4080 let actor_id = handle.actor_id().clone();
4081
4082 handle.drain_and_stop("test").unwrap();
4083 handle.await;
4084
4085 let snapshot = wait_for_terminated_snapshot(&proc, &actor_id).await;
4086 let attrs: hyperactor_config::Attrs =
4087 serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
4088 let status = attrs
4089 .get(crate::introspect::STATUS)
4090 .expect("must have status");
4091 assert!(
4092 status.starts_with("stopped"),
4093 "expected stopped, got: {}",
4094 status
4095 );
4096 assert!(
4097 attrs
4098 .get(crate::introspect::FAILURE_ERROR_MESSAGE)
4099 .is_none(),
4100 "stopped actor must not have failure attrs"
4101 );
4102 }
4103}