1use std::any::Any;
39use std::any::TypeId;
40use std::collections::HashMap;
41use std::fmt;
42use std::future::Future;
43use std::ops::Deref;
44use std::panic;
45use std::panic::AssertUnwindSafe;
46use std::panic::Location;
47use std::pin::Pin;
48use std::sync::Arc;
49use std::sync::OnceLock;
50use std::sync::RwLock;
51use std::sync::Weak;
52use std::sync::atomic::AtomicBool;
53use std::sync::atomic::AtomicU64;
54use std::sync::atomic::AtomicUsize;
55use std::sync::atomic::Ordering;
56use std::time::Duration;
57use std::time::Instant;
58use std::time::SystemTime;
59
60use async_trait::async_trait;
61use dashmap::DashMap;
62use dashmap::mapref::entry::Entry;
63use dashmap::mapref::multiple::RefMulti;
64use futures::FutureExt;
65use hyperactor_config::Flattrs;
66use hyperactor_telemetry::ActorStatusEvent;
67use hyperactor_telemetry::generate_actor_status_event_id;
68use hyperactor_telemetry::hash_to_u64;
69use hyperactor_telemetry::notify_actor_status_changed;
70use hyperactor_telemetry::notify_message;
71use hyperactor_telemetry::notify_message_status;
72use hyperactor_telemetry::recorder::Recording;
73use tokio::sync::mpsc;
74use tokio::sync::watch;
75use tokio::task::JoinHandle;
76use tracing::Instrument;
77use tracing::Span;
78use typeuri::Named;
79use uuid::Uuid;
80use wirevalue::TypeInfo;
81
82use crate as hyperactor;
83use crate::Actor;
84use crate::Handler;
85use crate::Message;
86use crate::RemoteMessage;
87use crate::actor::ActorError;
88use crate::actor::ActorErrorKind;
89use crate::actor::ActorHandle;
90use crate::actor::ActorStatus;
91use crate::actor::Binds;
92use crate::actor::HandlerInfo;
93use crate::actor::Referable;
94use crate::actor::RemoteHandles;
95use crate::actor::Signal;
96use crate::actor_local::ActorLocalStorage;
97use crate::channel;
98use crate::channel::ChannelAddr;
99use crate::channel::ChannelError;
100use crate::channel::ChannelTransport;
101use crate::config;
102use crate::context;
103use crate::context::Mailbox as _;
104use crate::introspect::IntrospectMessage;
105use crate::introspect::IntrospectResult;
106use crate::mailbox::BoxedMailboxSender;
107use crate::mailbox::DeliveryError;
108use crate::mailbox::DialMailboxRouter;
109use crate::mailbox::IntoBoxedMailboxSender as _;
110use crate::mailbox::Mailbox;
111use crate::mailbox::MailboxMuxer;
112use crate::mailbox::MailboxSender;
113use crate::mailbox::MailboxServer as _;
114use crate::mailbox::MessageEnvelope;
115use crate::mailbox::OncePortHandle;
116use crate::mailbox::OncePortReceiver;
117use crate::mailbox::PanickingMailboxSender;
118use crate::mailbox::PortHandle;
119use crate::mailbox::PortReceiver;
120use crate::mailbox::Undeliverable;
121use crate::metrics::ACTOR_MESSAGE_HANDLER_DURATION;
122use crate::metrics::ACTOR_MESSAGE_QUEUE_SIZE;
123use crate::metrics::ACTOR_MESSAGES_RECEIVED;
124use crate::ordering::OrderedSender;
125use crate::ordering::OrderedSenderError;
126use crate::ordering::SEQ_INFO;
127use crate::ordering::SeqInfo;
128use crate::ordering::Sequencer;
129use crate::ordering::ordered_channel;
130use crate::panic_handler;
131use crate::reference;
132use crate::supervision::ActorSupervisionEvent;
133
134static NEXT_LOCAL_RANK: AtomicUsize = AtomicUsize::new(0);
136
137#[derive(Clone)]
144pub struct Proc {
145 inner: Arc<ProcState>,
146}
147
148impl fmt::Debug for Proc {
149 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
150 f.debug_struct("Proc")
151 .field("proc_id", &self.inner.proc_id)
152 .finish()
153 }
154}
155
156struct ProcState {
157 proc_id: reference::ProcId,
160
161 proc_muxer: MailboxMuxer,
164
165 forwarder: BoxedMailboxSender,
167
168 roots: DashMap<String, AtomicUsize>,
173
174 instances: DashMap<reference::ActorId, WeakInstanceCell>,
176
177 terminated_snapshots: DashMap<reference::ActorId, crate::introspect::IntrospectResult>,
182
183 supervision_coordinator_port: OnceLock<PortHandle<ActorSupervisionEvent>>,
186
187 supervision_coordinator_actor_id: OnceLock<reference::ActorId>,
190
191 mailbox_server_handle: std::sync::Mutex<Option<crate::mailbox::MailboxServerHandle>>,
196}
197
198impl Drop for ProcState {
199 fn drop(&mut self) {
200 tracing::info!(
204 proc_id = %self.proc_id,
205 name = "ProcStatus",
206 status = "Dropped"
207 );
208 }
209}
210
211pub struct ActorInstance<A: Actor> {
216 pub instance: Instance<A>,
218 pub handle: ActorHandle<A>,
220 pub supervision: PortReceiver<ActorSupervisionEvent>,
222 pub signal: PortReceiver<Signal>,
224 pub work: mpsc::UnboundedReceiver<WorkCell<A>>,
226}
227
228impl Proc {
229 pub fn configured(proc_id: reference::ProcId, forwarder: BoxedMailboxSender) -> Self {
231 tracing::info!(
232 proc_id = %proc_id,
233 name = "ProcStatus",
234 status = "Created"
235 );
236
237 Self {
238 inner: Arc::new(ProcState {
239 proc_id,
240 proc_muxer: MailboxMuxer::new(),
241 forwarder,
242 roots: DashMap::new(),
243 instances: DashMap::new(),
244 terminated_snapshots: DashMap::new(),
245 supervision_coordinator_port: OnceLock::new(),
246 supervision_coordinator_actor_id: OnceLock::new(),
247 mailbox_server_handle: std::sync::Mutex::new(None),
248 }),
249 }
250 }
251
252 pub fn direct(addr: ChannelAddr, name: String) -> Result<Self, ChannelError> {
254 let (addr, rx) = channel::serve(addr)?;
255 let proc_id = reference::ProcId::with_name(addr, name);
256 let proc = Self::configured(proc_id, DialMailboxRouter::new().into_boxed());
257 let handle = proc.clone().serve(rx);
258 *proc.inner.mailbox_server_handle.lock().unwrap() = Some(handle);
259 Ok(proc)
260 }
261
262 pub fn set_supervision_coordinator(
265 &self,
266 port: PortHandle<ActorSupervisionEvent>,
267 ) -> Result<(), anyhow::Error> {
268 let actor_id = port.location().actor_id().clone();
269 self.state()
270 .supervision_coordinator_port
271 .set(port)
272 .map_err(|existing| anyhow::anyhow!("coordinator port is already set to {existing}"))?;
273 let _ = self.state().supervision_coordinator_actor_id.set(actor_id);
274 Ok(())
275 }
276
277 pub fn supervision_coordinator_actor_id(&self) -> Option<&reference::ActorId> {
280 self.state().supervision_coordinator_actor_id.get()
281 }
282
283 pub fn handle_unhandled_supervision_event(
286 &self,
287 cx: &impl context::Actor,
288 event: ActorSupervisionEvent,
289 ) {
290 let result = match self.state().supervision_coordinator_port.get() {
291 Some(port) => port.send(cx, event.clone()).map_err(anyhow::Error::from),
292 None => {
293 if !event.is_error() {
294 return;
297 }
298 Err(anyhow::anyhow!(
299 "coordinator port is not set for proc {}",
300 self.proc_id(),
301 ))
302 }
303 };
304 if let Err(err) = result {
305 if !event.is_error() {
306 tracing::debug!(
309 "proc {}: dropping non-error supervision event {}: {:?}",
310 self.proc_id(),
311 event,
312 err
313 );
314 return;
315 }
316 tracing::error!(
317 "proc {}: could not propagate supervision event {} due to error: {:?}: crashing",
318 self.proc_id(),
319 event,
320 err
321 );
322
323 std::process::exit(1);
324 }
325 }
326
327 pub fn local() -> Self {
330 let rank = NEXT_LOCAL_RANK.fetch_add(1, Ordering::Relaxed);
331 let addr = ChannelAddr::any(ChannelTransport::Local);
332 let proc_id = reference::ProcId::unique(addr, format!("local_{}", rank));
333 Proc::configured(proc_id, BoxedMailboxSender::new(PanickingMailboxSender))
334 }
335
336 pub fn proc_id(&self) -> &reference::ProcId {
338 &self.state().proc_id
339 }
340
341 pub fn forwarder(&self) -> &BoxedMailboxSender {
344 &self.inner.forwarder
345 }
346
347 fn state(&self) -> &ProcState {
349 self.inner.as_ref()
350 }
351
352 pub(crate) fn runtime() -> &'static Proc {
354 static RUNTIME_PROC: OnceLock<Proc> = OnceLock::new();
355 RUNTIME_PROC.get_or_init(|| {
356 let addr = ChannelAddr::any(ChannelTransport::Local);
357 let proc_id = reference::ProcId::unique(addr, "hyperactor_runtime");
358 Proc::configured(proc_id, BoxedMailboxSender::new(PanickingMailboxSender))
359 })
360 }
361
362 pub fn attach(&self, name: &str) -> Result<Mailbox, anyhow::Error> {
364 let actor_id: reference::ActorId = self.allocate_root_id(name)?;
365 Ok(self.bind_mailbox(actor_id))
366 }
367
368 pub fn attach_child(&self, parent_id: &reference::ActorId) -> Result<Mailbox, anyhow::Error> {
370 let actor_id: reference::ActorId = self.allocate_child_id(parent_id)?;
371 Ok(self.bind_mailbox(actor_id))
372 }
373
374 fn bind_mailbox(&self, actor_id: reference::ActorId) -> Mailbox {
376 let mbox = Mailbox::new(actor_id, BoxedMailboxSender::new(self.downgrade()));
377
378 self.state().proc_muxer.bind_mailbox(mbox.clone());
381 mbox
382 }
383
384 pub fn attach_actor<R, M>(
387 &self,
388 name: &str,
389 ) -> Result<(Instance<()>, reference::ActorRef<R>, PortReceiver<M>), anyhow::Error>
390 where
391 M: RemoteMessage,
392 R: Referable + RemoteHandles<M>,
393 {
394 let (instance, _handle) = self.instance(name)?;
395 let (_handle, rx) = instance.bind_actor_port::<M>();
396 let actor_ref = reference::ActorRef::attest(instance.self_id().clone());
397 Ok((instance, actor_ref, rx))
398 }
399
400 pub fn spawn<A: Actor>(&self, name: &str, actor: A) -> Result<ActorHandle<A>, anyhow::Error> {
403 let actor_id = self.allocate_root_id(name)?;
404 self.spawn_inner(actor_id, actor, None)
405 }
406
407 #[hyperactor::instrument(fields(actor_id = actor_id.to_string(), actor_name = actor_id.name(), actor_type = std::any::type_name::<A>()))]
410 fn spawn_inner<A: Actor>(
411 &self,
412 actor_id: reference::ActorId,
413 actor: A,
414 parent: Option<InstanceCell>,
415 ) -> Result<ActorHandle<A>, anyhow::Error> {
416 let (instance, receivers) = Instance::new(self.clone(), actor_id, false, parent);
417 Ok(instance.start(actor, receivers))
418 }
419
420 pub fn instance(&self, name: &str) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
425 let actor_id = self.allocate_root_id(name)?;
426 let (instance, _receivers) = Instance::new(self.clone(), actor_id, false, None);
427 let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
428 instance.change_status(ActorStatus::Client);
429 Ok((instance, handle))
430 }
431
432 pub fn introspectable_instance(
445 &self,
446 name: &str,
447 ) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
448 let actor_id = self.allocate_root_id(name)?;
449 let (instance, receivers) = Instance::new(self.clone(), actor_id, false, None);
450 let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
451 instance.change_status(ActorStatus::Client);
452 tokio::spawn(crate::introspect::serve_introspect(
453 instance.inner.cell.clone(),
454 instance.inner.mailbox.clone(),
455 receivers.introspect,
456 ));
457 Ok((instance, handle))
458 }
459
460 pub fn actor_instance<A: Actor>(&self, name: &str) -> Result<ActorInstance<A>, anyhow::Error> {
466 let actor_id = self.allocate_root_id(name)?;
467 let span = tracing::debug_span!(
468 "actor_instance",
469 actor_name = name,
470 actor_type = std::any::type_name::<A>(),
471 actor_id = actor_id.to_string(),
472 );
473 let _guard = span.enter();
474 let (instance, receivers) = Instance::new(self.clone(), actor_id.clone(), false, None);
475 let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
476 instance.change_status(ActorStatus::Client);
477
478 tokio::spawn(crate::introspect::serve_introspect(
479 instance.inner.cell.clone(),
480 instance.inner.mailbox.clone(),
481 receivers.introspect,
482 ));
483
484 let (signal_rx, supervision_rx) = receivers.actor_loop.unwrap();
485 Ok(ActorInstance {
486 instance,
487 handle,
488 supervision: supervision_rx,
489 signal: signal_rx,
490 work: receivers.work,
491 })
492 }
493
494 pub fn traverse<F>(&self, f: &mut F)
503 where
504 F: FnMut(&InstanceCell, usize),
505 {
506 for entry in self.state().instances.iter() {
507 if entry.key().pid() == 0 {
508 if let Some(cell) = entry.value().upgrade() {
509 cell.traverse(f);
510 }
511 }
512 }
513 }
514
515 pub fn get_instance(&self, actor_id: &reference::ActorId) -> Option<InstanceCell> {
517 self.state()
518 .instances
519 .get(actor_id)
520 .and_then(|weak| weak.upgrade())
521 }
522
523 pub fn root_actor_ids(&self) -> Vec<reference::ActorId> {
532 self.state()
533 .instances
534 .iter()
535 .filter(|entry| entry.key().pid() == 0)
536 .map(|entry| entry.key().clone())
537 .collect()
538 }
539
540 pub fn all_actor_ids(&self) -> Vec<reference::ActorId> {
549 self.state()
550 .instances
551 .iter()
552 .filter(|entry| {
553 entry
554 .value()
555 .upgrade()
556 .is_some_and(|cell| !cell.status().borrow().is_terminal())
557 })
558 .map(|entry| entry.key().clone())
559 .collect()
560 }
561
562 pub fn all_instance_keys(&self) -> Vec<reference::ActorId> {
574 self.state()
575 .instances
576 .iter()
577 .map(|entry| entry.key().clone())
578 .collect()
579 }
580
581 pub fn terminated_snapshot(
583 &self,
584 actor_id: &reference::ActorId,
585 ) -> Option<crate::introspect::IntrospectResult> {
586 self.state()
587 .terminated_snapshots
588 .get(actor_id)
589 .map(|e| e.value().clone())
590 }
591
592 pub fn all_terminated_actor_ids(&self) -> Vec<reference::ActorId> {
594 self.state()
595 .terminated_snapshots
596 .iter()
597 .map(|e| e.key().clone())
598 .collect()
599 }
600
601 fn child_instance(
603 &self,
604 parent: InstanceCell,
605 ) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
606 let actor_id = self.allocate_child_id(parent.actor_id())?;
607 let _ = tracing::debug_span!(
608 "child_actor_instance",
609 parent_actor_id = %parent.actor_id(),
610 actor_type = std::any::type_name::<()>(),
611 actor_id = %actor_id,
612 );
613
614 let (instance, _receivers) = Instance::new(self.clone(), actor_id, false, Some(parent));
615 let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
618 instance.change_status(ActorStatus::Client);
619 Ok((instance, handle))
620 }
621
622 pub(crate) fn spawn_child<A: Actor>(
628 &self,
629 parent: InstanceCell,
630 actor: A,
631 ) -> Result<ActorHandle<A>, anyhow::Error> {
632 let actor_id = self.allocate_child_id(parent.actor_id())?;
633 self.spawn_inner(actor_id, actor, Some(parent))
634 }
635
636 pub(crate) fn spawn_named_child<A: Actor>(
640 &self,
641 parent: InstanceCell,
642 name: &str,
643 actor: A,
644 ) -> Result<ActorHandle<A>, anyhow::Error> {
645 let actor_id = self.allocate_named_child_id(parent.actor_id(), name)?;
646 self.spawn_inner(actor_id, actor, Some(parent))
647 }
648
649 pub fn abort_root_actor(
653 &self,
654 root: &reference::ActorId,
655 this_handle: Option<&JoinHandle<()>>,
656 ) -> Option<impl Future<Output = reference::ActorId>> {
657 self.state()
658 .instances
659 .get(root)
660 .into_iter()
661 .flat_map(|e| e.upgrade())
662 .map(|cell| {
663 let r1 = root.clone();
664 let r2 = root.clone();
665 let skip_abort = this_handle.is_some_and(|this_h| {
667 cell.inner
668 .actor_task_handle
669 .get()
670 .is_some_and(|other_h| std::ptr::eq(this_h, other_h))
671 });
672 async move {
676 tokio::task::spawn_blocking(move || {
677 if !skip_abort {
678 let h = cell.inner.actor_task_handle.wait();
679 tracing::debug!("{}: aborting {:?}", r1, h);
680 h.abort();
681 }
682 })
683 .await
684 .unwrap();
685 r2
686 }
687 })
688 .next()
689 }
690
691 pub fn stop_actor(
694 &self,
695 actor_id: &reference::ActorId,
696 reason: String,
697 ) -> Option<watch::Receiver<ActorStatus>> {
698 if let Some(entry) = self.state().instances.get(actor_id) {
699 match entry.value().upgrade() {
700 None => None, Some(cell) => {
702 tracing::info!("sending stop signal to {}", cell.actor_id());
703 if let Err(err) = cell.signal(Signal::DrainAndStop(reason)) {
704 tracing::error!(
705 "{}: failed to send stop signal to pid {}: {:?}",
706 self.proc_id(),
707 cell.pid(),
708 err
709 );
710 None
711 } else {
712 Some(cell.status().clone())
713 }
714 }
715 }
716 } else {
717 tracing::error!("no actor {} found in {}", actor_id, self.proc_id());
718 None
719 }
720 }
721
722 pub async fn destroy_and_wait<A: Actor>(
730 &mut self,
731 timeout: Duration,
732 cx: Option<&Context<'_, A>>,
733 reason: &str,
734 ) -> Result<(Vec<reference::ActorId>, Vec<reference::ActorId>), anyhow::Error> {
735 self.destroy_and_wait_except_current::<A>(timeout, cx, false, reason)
736 .await
737 }
738
739 #[hyperactor::instrument]
749 pub async fn destroy_and_wait_except_current<A: Actor>(
750 &mut self,
751 timeout: Duration,
752 cx: Option<&Context<'_, A>>,
753 except_current: bool,
754 reason: &str,
755 ) -> Result<(Vec<reference::ActorId>, Vec<reference::ActorId>), anyhow::Error> {
756 tracing::debug!("{}: proc stopping", self.proc_id());
757
758 let (this_handle, this_actor_id) = cx.map_or((None, None), |cx| {
759 (
760 Some(cx.actor_task_handle().expect("cannot call destroy_and_wait from inside an actor unless actor has finished starting")),
761 Some(cx.self_id())
762 )
763 });
764
765 let coordinator_id = self.supervision_coordinator_actor_id().cloned();
766
767 let mut statuses = HashMap::new();
770 for actor_id in self
771 .state()
772 .instances
773 .iter()
774 .filter(|entry| entry.key().pid() == 0)
775 .map(|entry| entry.key().clone())
776 .collect::<Vec<_>>()
777 {
778 if coordinator_id.as_ref() == Some(&actor_id) {
779 continue;
780 }
781 if let Some(status) = self.stop_actor(&actor_id, reason.to_string()) {
782 statuses.insert(actor_id, status);
783 }
784 }
785 tracing::debug!("{}: non-coordinator actors stopped", self.proc_id());
786
787 let waits: Vec<_> = statuses
788 .iter_mut()
789 .filter(|(actor_id, _)| Some(*actor_id) != this_actor_id)
790 .map(|(actor_id, root)| {
791 let actor_id = actor_id.clone();
792 async move {
793 tokio::time::timeout(
794 timeout,
795 root.wait_for(|state: &ActorStatus| state.is_terminal()),
796 )
797 .await
798 .ok()
799 .map(|_| actor_id)
800 }
801 })
802 .collect();
803
804 let results = futures::future::join_all(waits).await;
805 let mut stopped_actors: Vec<_> = results
806 .iter()
807 .filter_map(|actor_id| actor_id.as_ref())
808 .cloned()
809 .collect();
810 let aborted_actors: Vec<_> = statuses
811 .iter()
812 .filter(|(actor_id, _)| !stopped_actors.contains(actor_id))
813 .map(|(actor_id, _)| {
814 let f = self.abort_root_actor(actor_id, this_handle);
815 async move {
816 let _ = if let Some(f) = f { Some(f.await) } else { None };
817 actor_id.clone()
822 }
823 })
824 .collect();
825 let mut aborted_actors = futures::future::join_all(aborted_actors).await;
826
827 if let Some(ref coord_id) = coordinator_id
831 && this_actor_id != Some(coord_id)
832 {
833 if let Some(mut status) = self.stop_actor(coord_id, reason.to_string()) {
834 let stopped = tokio::time::timeout(
835 timeout,
836 status.wait_for(|s: &ActorStatus| s.is_terminal()),
837 )
838 .await
839 .is_ok();
840 if stopped {
841 stopped_actors.push(coord_id.clone());
842 } else {
843 if let Some(f) = self.abort_root_actor(coord_id, this_handle) {
844 f.await;
845 }
846 aborted_actors.push(coord_id.clone());
847 }
848 }
849 }
850
851 let flush_timeout = hyperactor_config::global::get(crate::config::FORWARDER_FLUSH_TIMEOUT);
858 match tokio::time::timeout(flush_timeout, self.state().forwarder.flush()).await {
859 Ok(Err(err)) => {
860 tracing::warn!(
861 "{}: forwarder flush failed during proc exit: {:?}",
862 self.proc_id(),
863 err
864 );
865 }
866 Err(_elapsed) => {
867 tracing::warn!(
868 "{}: forwarder flush timed out during proc exit",
869 self.proc_id(),
870 );
871 }
872 Ok(Ok(())) => {}
873 }
874
875 if let Some(this_handle) = this_handle
876 && let Some(this_actor_id) = this_actor_id
877 && !except_current
878 {
879 tracing::debug!("{}: aborting (delayed) {:?}", this_actor_id, this_handle);
880 this_handle.abort()
881 };
882
883 tracing::info!(
884 "destroy_and_wait: {} actors stopped, {} actors aborted",
885 stopped_actors.len(),
886 aborted_actors.len()
887 );
888 Ok((stopped_actors, aborted_actors))
889 }
890
891 pub fn resolve_actor_ref<R: Actor + Referable>(
913 &self,
914 actor_ref: &reference::ActorRef<R>,
915 ) -> Option<ActorHandle<R>> {
916 let cell = self.inner.instances.get(actor_ref.actor_id())?.upgrade()?;
917 if cell.status().borrow().is_terminal() {
921 return None;
922 }
923 cell.downcast_handle()
924 }
925
926 fn allocate_root_id(&self, name: &str) -> Result<reference::ActorId, anyhow::Error> {
928 let name = name.to_string();
929 match self.state().roots.entry(name.to_string()) {
930 Entry::Vacant(entry) => {
931 entry.insert(AtomicUsize::new(1));
932 }
933 Entry::Occupied(_) => {
934 anyhow::bail!("an actor with name '{}' has already been spawned", name)
935 }
936 }
937 Ok(reference::ActorId::new(
938 self.state().proc_id.clone(),
939 name.to_string(),
940 0,
941 ))
942 }
943
944 #[hyperactor::instrument(fields(actor_name=parent_id.name()))]
946 pub(crate) fn allocate_child_id(
947 &self,
948 parent_id: &reference::ActorId,
949 ) -> Result<reference::ActorId, anyhow::Error> {
950 assert_eq!(*parent_id.proc_id(), self.state().proc_id);
951 let pid = match self.state().roots.get(parent_id.name()) {
952 None => anyhow::bail!(
953 "no actor named {} in proc {}",
954 parent_id.name(),
955 self.state().proc_id
956 ),
957 Some(next_pid) => next_pid.fetch_add(1, Ordering::Relaxed),
958 };
959 Ok(parent_id.child_id(pid))
960 }
961
962 pub(crate) fn allocate_named_child_id(
967 &self,
968 parent_id: &reference::ActorId,
969 name: &str,
970 ) -> Result<reference::ActorId, anyhow::Error> {
971 let inherited = self.allocate_child_id(parent_id)?;
972 Ok(reference::ActorId::new(
973 inherited.proc_id().clone(),
974 name,
975 inherited.pid(),
976 ))
977 }
978
979 pub fn downgrade(&self) -> WeakProc {
981 WeakProc::new(self)
982 }
983
984 pub async fn flush(&self) -> Result<(), anyhow::Error> {
988 self.state().forwarder.flush().await
989 }
990
991 pub async fn join_mailbox_server(&self) {
1000 let handle = self.inner.mailbox_server_handle.lock().unwrap().take();
1001 if let Some(handle) = handle {
1002 handle.stop("proc shutting down");
1003 let _ = handle.await;
1004 }
1005 }
1006}
1007
1008#[async_trait]
1009impl MailboxSender for Proc {
1010 fn post_unchecked(
1011 &self,
1012 envelope: MessageEnvelope,
1013 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1014 ) {
1015 if envelope.dest().actor_id().proc_id() == &self.state().proc_id {
1016 self.state().proc_muxer.post(envelope, return_handle)
1017 } else {
1018 self.state().forwarder.post(envelope, return_handle)
1019 }
1020 }
1021
1022 async fn flush(&self) -> Result<(), anyhow::Error> {
1023 let (r1, r2) = futures::future::join(
1024 self.state().proc_muxer.flush(),
1025 self.state().forwarder.flush(),
1026 )
1027 .await;
1028 r1?;
1029 r2?;
1030 Ok(())
1031 }
1032}
1033
1034#[derive(Clone, Debug)]
1036pub struct WeakProc(Weak<ProcState>);
1037
1038impl WeakProc {
1039 fn new(proc: &Proc) -> Self {
1040 Self(Arc::downgrade(&proc.inner))
1041 }
1042
1043 pub fn upgrade(&self) -> Option<Proc> {
1045 self.0.upgrade().map(|inner| Proc { inner })
1046 }
1047}
1048
1049#[async_trait]
1050impl MailboxSender for WeakProc {
1051 fn post_unchecked(
1052 &self,
1053 envelope: MessageEnvelope,
1054 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1055 ) {
1056 match self.upgrade() {
1057 Some(proc) => proc.post(envelope, return_handle),
1058 None => envelope.undeliverable(
1059 DeliveryError::BrokenLink("fail to upgrade WeakProc".to_string()),
1060 return_handle,
1061 ),
1062 }
1063 }
1064
1065 async fn flush(&self) -> Result<(), anyhow::Error> {
1066 match self.upgrade() {
1067 Some(proc) => proc.flush().await,
1068 None => Ok(()),
1069 }
1070 }
1071}
1072
1073pub struct WorkCell<A: Actor + Send>(
1076 Box<
1077 dyn for<'a> FnOnce(
1078 &'a mut A,
1079 &'a Instance<A>,
1080 )
1081 -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
1082 + Send
1083 + Sync,
1084 >,
1085);
1086
1087impl<A: Actor + Send> WorkCell<A> {
1088 fn new(
1090 f: impl for<'a> FnOnce(
1091 &'a mut A,
1092 &'a Instance<A>,
1093 )
1094 -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
1095 + Send
1096 + Sync
1097 + 'static,
1098 ) -> Self {
1099 Self(Box::new(f))
1100 }
1101
1102 pub fn handle<'a>(
1104 self,
1105 actor: &'a mut A,
1106 instance: &'a Instance<A>,
1107 ) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'a>> {
1108 (self.0)(actor, instance)
1109 }
1110}
1111
1112pub struct Context<'a, A: Actor> {
1114 instance: &'a Instance<A>,
1115 headers: Flattrs,
1116}
1117
1118impl<'a, A: Actor> Context<'a, A> {
1119 pub fn new(instance: &'a Instance<A>, headers: Flattrs) -> Self {
1121 Self { instance, headers }
1122 }
1123
1124 pub fn headers(&self) -> &Flattrs {
1126 &self.headers
1127 }
1128}
1129
1130impl<A: Actor> Deref for Context<'_, A> {
1131 type Target = Instance<A>;
1132
1133 fn deref(&self) -> &Self::Target {
1134 self.instance
1135 }
1136}
1137
1138pub struct Instance<A: Actor> {
1142 inner: Arc<InstanceState<A>>,
1143}
1144
1145impl<A: Actor> fmt::Debug for Instance<A> {
1146 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1147 f.debug_struct("Instance").field("inner", &"..").finish()
1148 }
1149}
1150
1151struct InstanceState<A: Actor> {
1152 proc: Proc,
1154
1155 cell: InstanceCell,
1157
1158 mailbox: Mailbox,
1160
1161 ports: Arc<Ports<A>>,
1162
1163 status_tx: watch::Sender<ActorStatus>,
1165
1166 id: Uuid,
1168
1169 sequencer: Sequencer,
1171
1172 instance_locals: ActorLocalStorage,
1174}
1175
1176impl<A: Actor> InstanceState<A> {
1177 fn self_id(&self) -> &reference::ActorId {
1178 self.mailbox.actor_id()
1179 }
1180}
1181
1182impl<A: Actor> Drop for InstanceState<A> {
1183 fn drop(&mut self) {
1184 self.status_tx.send_if_modified(|status| {
1185 if status.is_terminal() {
1186 false
1187 } else {
1188 tracing::info!(
1189 name = "ActorStatus",
1190 actor_id = %self.self_id(),
1191 actor_name = self.self_id().name(),
1192 status = "Stopped",
1193 prev_status = status.arm().unwrap_or("unknown"),
1194 "instance is dropped",
1195 );
1196 *status = ActorStatus::Stopped("instance is dropped".into());
1197 true
1198 }
1199 });
1200 }
1201}
1202
1203pub struct InstanceReceivers<A: Actor> {
1210 actor_loop: Option<(PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>)>,
1213 work: mpsc::UnboundedReceiver<WorkCell<A>>,
1215 introspect: PortReceiver<IntrospectMessage>,
1217}
1218
1219impl<A: Actor> Instance<A> {
1220 fn new(
1222 proc: Proc,
1223 actor_id: reference::ActorId,
1224 detached: bool,
1225 parent: Option<InstanceCell>,
1226 ) -> (Self, InstanceReceivers<A>) {
1227 let mailbox = Mailbox::new(actor_id.clone(), BoxedMailboxSender::new(proc.downgrade()));
1229 let (work_tx, work_rx) = ordered_channel(
1230 actor_id.to_string(),
1231 hyperactor_config::global::get(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER),
1232 );
1233 let ports: Arc<Ports<A>> = Arc::new(Ports::new(mailbox.clone(), work_tx));
1234 proc.state().proc_muxer.bind_mailbox(mailbox.clone());
1235 let (status_tx, status_rx) = watch::channel(ActorStatus::Created);
1236
1237 let actor_type = match TypeInfo::of::<A>() {
1238 Some(info) => ActorType::Named(info),
1239 None => ActorType::Anonymous(std::any::type_name::<A>()),
1240 };
1241 let actor_loop_ports = if detached {
1242 None
1243 } else {
1244 let (signal_port, signal_receiver) = ports.open_message_port().unwrap();
1245 let (supervision_port, supervision_receiver) = mailbox.open_port();
1246 Some((
1247 (signal_port, supervision_port),
1248 (signal_receiver, supervision_receiver),
1249 ))
1250 };
1251
1252 let (actor_loop, actor_loop_receivers) = actor_loop_ports.unzip();
1253
1254 let (introspect_port, introspect_receiver) =
1262 ports.open_message_port::<IntrospectMessage>().unwrap();
1263 introspect_port.bind_actor_port();
1264
1265 let cell = InstanceCell::new(
1266 actor_id,
1267 actor_type,
1268 proc.clone(),
1269 actor_loop,
1270 status_rx,
1271 parent,
1272 ports.clone(),
1273 );
1274 let instance_id = Uuid::now_v7();
1275 let inner = Arc::new(InstanceState {
1276 proc,
1277 cell,
1278 mailbox,
1279 ports,
1280 status_tx,
1281 sequencer: Sequencer::new(instance_id),
1282 id: instance_id,
1283 instance_locals: ActorLocalStorage::new(),
1284 });
1285 (
1286 Self { inner },
1287 InstanceReceivers {
1288 actor_loop: actor_loop_receivers,
1289 work: work_rx,
1290 introspect: introspect_receiver,
1291 },
1292 )
1293 }
1294
1295 #[track_caller]
1298 pub fn change_status(&self, new: ActorStatus) {
1299 let old = self.inner.status_tx.send_replace(new.clone());
1300 assert!(
1306 !old.is_terminal() && !new.is_terminal()
1307 || !old.is_terminal() && new.is_terminal()
1308 || old == new,
1309 "actor changing status illegally, only allow non-terminal -> non-terminal \
1310 and non-terminal -> terminal statuses. actor_id={}, prev_status={}, status={}",
1311 self.self_id(),
1312 old,
1313 new
1314 );
1315 if !((old.is_idle() && new.is_processing())
1320 || (old.is_processing() && new.is_idle())
1321 || old == new)
1322 {
1323 let new_status = new.arm().unwrap_or("unknown");
1324 let change_reason = match &new {
1325 ActorStatus::Failed(reason) => reason.to_string(),
1326 ActorStatus::Stopped(reason) => reason.clone(),
1327 _ => "".to_string(),
1328 };
1329 tracing::info!(
1330 name = "ActorStatus",
1331 actor_id = %self.self_id(),
1332 actor_name = self.self_id().name(),
1333 status = new_status,
1334 prev_status = old.arm().unwrap_or("unknown"),
1335 caller = %Location::caller(),
1336 change_reason,
1337 );
1338 let actor_id = hash_to_u64(self.self_id());
1339 notify_actor_status_changed(ActorStatusEvent {
1340 id: generate_actor_status_event_id(actor_id),
1341 timestamp: std::time::SystemTime::now(),
1342 actor_id,
1343 new_status: new_status.to_string(),
1344 reason: if change_reason.is_empty() {
1345 None
1346 } else {
1347 Some(change_reason)
1348 },
1349 });
1350 }
1351 }
1352
1353 fn is_terminal(&self) -> bool {
1354 self.inner.status_tx.borrow().is_terminal()
1355 }
1356
1357 fn is_stopping(&self) -> bool {
1358 self.inner.status_tx.borrow().is_stopping()
1359 }
1360
1361 pub fn self_id(&self) -> &reference::ActorId {
1363 self.inner.self_id()
1364 }
1365
1366 pub fn introspect_payload(&self) -> crate::introspect::IntrospectResult {
1382 crate::introspect::live_actor_payload(&self.inner.cell)
1383 }
1384
1385 pub fn publish_attrs(&self, attrs: hyperactor_config::Attrs) {
1393 #[cfg(debug_assertions)]
1394 {
1395 use std::collections::HashSet;
1396 use std::sync::OnceLock;
1397
1398 use hyperactor_config::attrs::AttrKeyInfo;
1399
1400 static INTROSPECT_KEYS: OnceLock<HashSet<&'static str>> = OnceLock::new();
1401 let allowed = INTROSPECT_KEYS.get_or_init(|| {
1402 inventory::iter::<AttrKeyInfo>()
1403 .filter(|info| info.meta.get(hyperactor_config::INTROSPECT).is_some())
1404 .map(|info| info.name)
1405 .collect()
1406 });
1407 for (name, _) in attrs.iter() {
1408 debug_assert!(
1409 allowed.contains(name),
1410 "publish_attrs: key {:?} is not tagged with INTROSPECT",
1411 name
1412 );
1413 }
1414 }
1415 self.inner.cell.set_published_attrs(attrs);
1416 }
1417
1418 pub fn publish_attr<T: hyperactor_config::AttrValue>(
1424 &self,
1425 key: hyperactor_config::Key<T>,
1426 value: T,
1427 ) {
1428 debug_assert!(
1429 key.attrs().get(hyperactor_config::INTROSPECT).is_some(),
1430 "publish_attr called with non-introspection key: {}",
1431 key.name()
1432 );
1433 self.inner.cell.merge_published_attr(key, value);
1434 }
1435
1436 pub fn set_system(&self) {
1439 self.inner
1440 .cell
1441 .inner
1442 .is_system
1443 .store(true, Ordering::Relaxed);
1444 }
1445
1446 pub fn set_query_child_handler(
1455 &self,
1456 handler: impl (Fn(&crate::reference::Reference) -> IntrospectResult) + Send + Sync + 'static,
1457 ) {
1458 self.inner.cell.set_query_child_handler(handler);
1459 }
1460
1461 pub fn stop(&self, reason: &str) -> Result<(), ActorError> {
1463 tracing::info!(
1464 actor_id = %self.inner.cell.actor_id(),
1465 reason,
1466 "Instance::stop called",
1467 );
1468 self.inner
1469 .cell
1470 .signal(Signal::DrainAndStop(reason.to_string()))
1471 }
1472
1473 pub fn abort(&self, reason: &str) -> Result<(), ActorError> {
1475 tracing::info!(
1476 actor_id = %self.inner.cell.actor_id(),
1477 reason,
1478 "Instance::abort called",
1479 );
1480 self.inner.cell.signal(Signal::Abort(reason.to_string()))
1481 }
1482
1483 pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1488 self.inner.mailbox.open_port()
1489 }
1490
1491 pub fn open_once_port<M: Message>(&self) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1495 self.inner.mailbox.open_once_port()
1496 }
1497
1498 pub fn locals(&self) -> &ActorLocalStorage {
1500 &self.inner.instance_locals
1501 }
1502
1503 pub fn post(&self, port_id: reference::PortId, headers: Flattrs, message: wirevalue::Any) {
1505 <Self as context::MailboxExt>::post(
1506 self,
1507 port_id,
1508 headers,
1509 message,
1510 true,
1511 context::SeqInfoPolicy::AssignNew,
1512 )
1513 }
1514
1515 #[doc(hidden)]
1521 pub fn post_with_external_seq_info(
1522 &self,
1523 port_id: reference::PortId,
1524 headers: Flattrs,
1525 message: wirevalue::Any,
1526 ) {
1527 <Self as context::MailboxExt>::post(
1528 self,
1529 port_id,
1530 headers,
1531 message,
1532 true,
1533 context::SeqInfoPolicy::AllowExternal,
1534 )
1535 }
1536
1537 pub fn self_client() -> &'static Instance<()> {
1543 static CLIENT: OnceLock<(Instance<()>, ActorHandle<()>)> = OnceLock::new();
1544 &CLIENT
1545 .get_or_init(|| Proc::runtime().instance("self_message_client").unwrap())
1546 .0
1547 }
1548
1549 pub fn self_message_with_delay<M>(&self, message: M, delay: Duration) -> Result<(), ActorError>
1551 where
1552 M: Message,
1553 A: Handler<M>,
1554 {
1555 let client = Self::self_client();
1556 let port = self.port();
1557 let self_id = self.self_id().clone();
1558 tokio::spawn(async move {
1559 tokio::time::sleep(delay).await;
1560 if let Err(e) = port.send(&client, message) {
1561 tracing::info!("{}: error sending delayed message: {}", self_id, e);
1564 }
1565 });
1566 Ok(())
1567 }
1568
1569 fn start(self, actor: A, receivers: InstanceReceivers<A>) -> ActorHandle<A> {
1572 let instance_cell = self.inner.cell.clone();
1573 let actor_id = self.inner.cell.actor_id().clone();
1574 let actor_handle = ActorHandle::new(self.inner.cell.clone(), self.inner.ports.clone());
1575
1576 tokio::spawn(crate::introspect::serve_introspect(
1580 self.inner.cell.clone(),
1581 self.inner.mailbox.clone(),
1582 receivers.introspect,
1583 ));
1584
1585 let actor_loop_receivers = receivers
1586 .actor_loop
1587 .expect("non-detached instance must have actor loop receivers");
1588 let actor_task_handle = A::spawn_server_task(
1589 panic_handler::with_backtrace_tracking(self.serve(
1590 actor,
1591 actor_loop_receivers,
1592 receivers.work,
1593 ))
1594 .instrument(Span::current()),
1595 );
1596 tracing::debug!("{}: spawned with {:?}", actor_id, actor_task_handle);
1597 instance_cell
1598 .inner
1599 .actor_task_handle
1600 .set(actor_task_handle)
1601 .unwrap_or_else(|_| panic!("{}: task handle store failed", actor_id));
1602
1603 actor_handle
1604 }
1605
1606 async fn serve(
1607 mut self,
1608 mut actor: A,
1609 actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1610 mut work_rx: mpsc::UnboundedReceiver<WorkCell<A>>,
1611 ) {
1612 let result = self
1613 .run_actor_tree(&mut actor, actor_loop_receivers, &mut work_rx)
1614 .await;
1615
1616 assert!(self.is_stopping());
1617 let event = match result {
1618 Ok(stop_reason) => {
1619 let status = ActorStatus::Stopped(stop_reason);
1620 self.mailbox().close(status.clone());
1621 let event = ActorSupervisionEvent::new(
1622 self.inner.cell.actor_id().clone(),
1623 actor.display_name(),
1624 status.clone(),
1625 None,
1626 );
1627 *self.inner.cell.inner.supervision_event.lock().unwrap() = Some(event.clone());
1629 self.change_status(status);
1630 Some(event)
1631 }
1632 Err(err) => {
1633 match *err.kind {
1634 ActorErrorKind::UnhandledSupervisionEvent(box event) => {
1635 assert!(event.actor_status.is_terminal());
1637 self.mailbox().close(event.actor_status.clone());
1638 *self.inner.cell.inner.supervision_event.lock().unwrap() =
1640 Some(event.clone());
1641 self.change_status(event.actor_status.clone());
1642 Some(event)
1643 }
1644 _ => {
1645 let error_kind = ActorErrorKind::Generic(err.kind.to_string());
1646 let status = ActorStatus::Failed(error_kind);
1647 self.mailbox().close(status.clone());
1648 let event = ActorSupervisionEvent::new(
1649 self.inner.cell.actor_id().clone(),
1650 actor.display_name(),
1651 status.clone(),
1652 None,
1653 );
1654 *self.inner.cell.inner.supervision_event.lock().unwrap() =
1656 Some(event.clone());
1657 self.change_status(status);
1658 Some(event)
1659 }
1660 }
1661 }
1662 };
1663
1664 if let Some(parent) = self.inner.cell.maybe_unlink_parent() {
1665 if let Some(event) = event {
1666 parent.send_supervision_event_or_crash(&self, event);
1668 }
1669 if let Err(err) = parent.signal(Signal::ChildStopped(self.inner.cell.pid())) {
1672 tracing::error!(
1673 "{}: failed to send stop message to parent pid {}: {:?}",
1674 self.self_id(),
1675 parent.pid(),
1676 err
1677 );
1678 }
1679 } else {
1680 if let Some(event) = event {
1686 self.inner
1687 .proc
1688 .handle_unhandled_supervision_event(&self, event);
1689 }
1690 }
1691 }
1692
1693 async fn run_actor_tree(
1697 &mut self,
1698 actor: &mut A,
1699 mut actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1700 work_rx: &mut mpsc::UnboundedReceiver<WorkCell<A>>,
1701 ) -> Result<String, ActorError> {
1702 let mut did_panic = false;
1708 let result = match AssertUnwindSafe(self.run(actor, &mut actor_loop_receivers, work_rx))
1709 .catch_unwind()
1710 .await
1711 {
1712 Ok(result) => result,
1713 Err(_) => {
1714 did_panic = true;
1715 let panic_info = panic_handler::take_panic_info()
1716 .map(|info| info.to_string())
1717 .unwrap_or_else(|e| format!("Cannot take backtrace due to: {:?}", e));
1718 Err(ActorError::new(
1719 self.self_id(),
1720 ActorErrorKind::panic(anyhow::anyhow!(panic_info)),
1721 ))
1722 }
1723 };
1724
1725 assert!(!self.is_terminal());
1726 self.change_status(ActorStatus::Stopping);
1727 if let Err(err) = &result {
1728 tracing::error!("{}: actor failure: {}", self.self_id(), err);
1729 }
1730
1731 let mut to_unlink = Vec::new();
1734 for child in self.inner.cell.child_iter() {
1735 if let Err(err) = child
1736 .value()
1737 .signal(Signal::Stop("parent stopping".to_string()))
1738 {
1739 tracing::error!(
1740 "{}: failed to send stop signal to child pid {}: {:?}",
1741 self.self_id(),
1742 child.key(),
1743 err
1744 );
1745 to_unlink.push(child.value().clone());
1746 }
1747 }
1748 for child in to_unlink {
1750 self.inner.cell.unlink(&child);
1751 }
1752
1753 let (mut signal_receiver, _) = actor_loop_receivers;
1754 while self.inner.cell.child_count() > 0 {
1755 match tokio::time::timeout(Duration::from_millis(500), signal_receiver.recv()).await {
1756 Ok(signal) => {
1757 if let Signal::ChildStopped(pid) = signal? {
1758 assert!(self.inner.cell.get_child(pid).is_none());
1759 }
1760 }
1761 Err(_) => {
1762 tracing::warn!(
1763 "timeout waiting for ChildStopped signal from child on actor: {}, ignoring",
1764 self.self_id()
1765 );
1766 self.inner.cell.unlink_all();
1769 break;
1770 }
1771 }
1772 }
1773 let cleanup_result = if !did_panic {
1779 let cleanup_timeout = hyperactor_config::global::get(config::CLEANUP_TIMEOUT);
1780 match tokio::time::timeout(cleanup_timeout, actor.cleanup(self, result.as_ref().err()))
1781 .await
1782 {
1783 Ok(Ok(x)) => Ok(x),
1784 Ok(Err(e)) => Err(ActorError::new(self.self_id(), ActorErrorKind::cleanup(e))),
1785 Err(e) => Err(ActorError::new(
1786 self.self_id(),
1787 ActorErrorKind::cleanup(e.into()),
1788 )),
1789 }
1790 } else {
1791 Ok(())
1792 };
1793 if let Err(ref actor_err) = result {
1794 if let Err(ref err) = cleanup_result {
1797 tracing::warn!(
1798 cleanup_err = %err,
1799 %actor_err,
1800 "ignoring cleanup error after actor error",
1801 );
1802 }
1803 }
1804 result.and_then(|reason| cleanup_result.map(|_| reason))
1807 }
1808
1809 async fn run(
1813 &mut self,
1814 actor: &mut A,
1815 actor_loop_receivers: &mut (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1816 work_rx: &mut mpsc::UnboundedReceiver<WorkCell<A>>,
1817 ) -> Result<String, ActorError> {
1818 let (signal_receiver, supervision_event_receiver) = actor_loop_receivers;
1819
1820 self.change_status(ActorStatus::Initializing);
1821 actor
1822 .init(self)
1823 .await
1824 .map_err(|err| ActorError::new(self.self_id(), ActorErrorKind::init(err)))?;
1825 let need_drain;
1826 let stop_reason;
1827 'messages: loop {
1828 self.change_status(ActorStatus::Idle);
1829 let metric_pairs =
1830 hyperactor_telemetry::kv_pairs!("actor_id" => self.self_id().to_string());
1831 tokio::select! {
1832 work = work_rx.recv() => {
1833 ACTOR_MESSAGES_RECEIVED.add(1, metric_pairs);
1834 ACTOR_MESSAGE_QUEUE_SIZE.add(-1, metric_pairs);
1835 let _ = ACTOR_MESSAGE_HANDLER_DURATION.start(metric_pairs);
1836 let work = work.expect("inconsistent work queue state");
1837 if let Err(err) = work.handle(actor, self).await {
1838 for supervision_event in supervision_event_receiver.drain() {
1839 self.handle_supervision_event(actor, supervision_event).await?;
1840 }
1841 let kind = ActorErrorKind::processing(err);
1842 return Err(ActorError {
1843 actor_id: Box::new(self.self_id().clone()),
1844 kind: Box::new(kind),
1845 });
1846 }
1847 }
1848 signal = signal_receiver.recv() => {
1849 let signal = signal.map_err(ActorError::from);
1850 tracing::debug!("Received signal {signal:?}");
1851 match signal? {
1852 Signal::Stop(reason) => {
1853 need_drain = false;
1854 stop_reason = reason;
1855 break 'messages;
1856 },
1857 Signal::DrainAndStop(reason) => {
1858 need_drain = true;
1859 stop_reason = reason;
1860 break 'messages;
1861 },
1862 Signal::ChildStopped(pid) => {
1863 assert!(self.inner.cell.get_child(pid).is_none());
1864 },
1865 Signal::Abort(reason) => {
1866 return Err(ActorError { actor_id: Box::new(self.self_id().clone()), kind: Box::new(ActorErrorKind::Aborted(reason)) });
1867 }
1868 }
1869 }
1870 Ok(supervision_event) = supervision_event_receiver.recv() => {
1871 self.handle_supervision_event(actor, supervision_event).await?;
1872 }
1873 }
1874 self.inner
1875 .cell
1876 .inner
1877 .num_processed_messages
1878 .fetch_add(1, Ordering::SeqCst);
1879 }
1880
1881 if need_drain {
1882 let mut n = 0;
1883 while let Ok(work) = work_rx.try_recv() {
1884 if let Err(err) = work.handle(actor, self).await {
1885 return Err(ActorError::new(
1886 self.self_id(),
1887 ActorErrorKind::processing(err),
1888 ));
1889 }
1890 n += 1;
1891 }
1892 tracing::debug!("drained {} messages", n);
1893 }
1894 tracing::debug!(
1895 actor_id = %self.self_id(),
1896 reason = stop_reason,
1897 "exited actor loop",
1898 );
1899 Ok(stop_reason)
1900 }
1901
1902 pub async fn handle_supervision_event(
1904 &self,
1905 actor: &mut A,
1906 supervision_event: ActorSupervisionEvent,
1907 ) -> Result<(), ActorError> {
1908 match actor
1910 .handle_supervision_event(self, &supervision_event)
1911 .await
1912 {
1913 Ok(true) => {
1914 Ok(())
1916 }
1917 Ok(false) => {
1918 let kind = ActorErrorKind::UnhandledSupervisionEvent(Box::new(supervision_event));
1919 Err(ActorError::new(self.self_id(), kind))
1920 }
1921 Err(err) => {
1922 let kind = ActorErrorKind::ErrorDuringHandlingSupervision(
1925 err.to_string(),
1926 Box::new(supervision_event),
1927 );
1928 Err(ActorError::new(self.self_id(), kind))
1929 }
1930 }
1931 }
1932
1933 async unsafe fn handle_message<M: Message>(
1934 &self,
1935 actor: &mut A,
1936 type_info: Option<&'static TypeInfo>,
1937 headers: Flattrs,
1938 message: M,
1939 ) -> Result<(), anyhow::Error>
1940 where
1941 A: Handler<M>,
1942 {
1943 let handler_info = match type_info {
1945 Some(info) => {
1946 let arm = unsafe { info.arm_unchecked(&message as *const M as *const ()) };
1948 HandlerInfo::from_static(info.typename(), arm)
1949 }
1950 None => {
1951 HandlerInfo::from_static(std::any::type_name::<M>(), None)
1953 }
1954 };
1955
1956 let endpoint = type_info.and_then(|info| {
1957 unsafe { info.endpoint_name(&message as *const M as *const ()) }
1959 });
1960
1961 self.handle_message_with_handler_info(actor, handler_info, headers, message, endpoint)
1963 .await
1964 }
1965
1966 #[tracing::instrument(level = "debug", name = "handle_message", skip_all, fields(actor_id = %self.self_id(), message_type = %handler_info))]
1968 async fn handle_message_with_handler_info<M: Message>(
1969 &self,
1970 actor: &mut A,
1971 handler_info: HandlerInfo,
1972 headers: Flattrs,
1973 message: M,
1974 endpoint: Option<String>,
1975 ) -> Result<(), anyhow::Error>
1976 where
1977 A: Handler<M>,
1978 {
1979 let now = std::time::SystemTime::now();
1980 let handler_info = Some(handler_info);
1981 self.change_status(ActorStatus::Processing(now, handler_info.clone()));
1982 crate::mailbox::headers::log_message_latency_if_sampling(
1983 &headers,
1984 self.self_id().to_string(),
1985 );
1986
1987 let message_id = headers.get(crate::mailbox::headers::TELEMETRY_MESSAGE_ID);
1988
1989 if let Some(message_id) = message_id {
1990 let from_actor_id = headers
1991 .get(crate::mailbox::headers::SENDER_ACTOR_ID_HASH)
1992 .unwrap_or(0);
1993 let to_actor_id = hash_to_u64(self.self_id());
1994 let port_id = headers.get(crate::mailbox::headers::TELEMETRY_PORT_ID);
1995
1996 notify_message(hyperactor_telemetry::MessageEvent {
1997 timestamp: now,
1998 id: message_id,
1999 from_actor_id,
2000 to_actor_id,
2001 endpoint,
2002 port_id,
2003 });
2004
2005 notify_message_status(hyperactor_telemetry::MessageStatusEvent {
2006 timestamp: now,
2007 id: hyperactor_telemetry::generate_status_event_id(message_id),
2008 message_id,
2009 status: "active".to_string(),
2010 });
2011 }
2012
2013 *self.inner.cell.inner.last_message_handler.write().unwrap() = handler_info;
2015
2016 let context = Context::new(self, headers);
2017 let start = Instant::now();
2021 let result = actor
2022 .handle(&context, message)
2023 .instrument(self.inner.cell.inner.recording.span())
2024 .await;
2025 let elapsed_us = start.elapsed().as_micros() as u64;
2026 self.inner
2027 .cell
2028 .inner
2029 .total_processing_time_us
2030 .fetch_add(elapsed_us, Ordering::SeqCst);
2031
2032 if let Some(message_id) = message_id {
2033 notify_message_status(hyperactor_telemetry::MessageStatusEvent {
2034 timestamp: std::time::SystemTime::now(),
2035 id: hyperactor_telemetry::generate_status_event_id(message_id),
2036 message_id,
2037 status: "complete".to_string(),
2038 });
2039 }
2040
2041 result
2042 }
2043
2044 pub fn spawn<C: Actor>(&self, actor: C) -> anyhow::Result<ActorHandle<C>> {
2046 self.inner.proc.spawn_child(self.inner.cell.clone(), actor)
2047 }
2048
2049 pub fn spawn_with_name<C: Actor>(
2053 &self,
2054 name: &str,
2055 actor: C,
2056 ) -> anyhow::Result<ActorHandle<C>> {
2057 self.inner
2058 .proc
2059 .spawn_named_child(self.inner.cell.clone(), name, actor)
2060 }
2061
2062 pub fn child(&self) -> anyhow::Result<(Instance<()>, ActorHandle<()>)> {
2064 self.inner.proc.child_instance(self.inner.cell.clone())
2065 }
2066
2067 pub fn port<M: Message>(&self) -> PortHandle<M>
2070 where
2071 A: Handler<M>,
2072 {
2073 self.inner.ports.get()
2074 }
2075
2076 pub fn handle(&self) -> ActorHandle<A> {
2078 ActorHandle::new(self.inner.cell.clone(), Arc::clone(&self.inner.ports))
2079 }
2080
2081 pub fn bind<R: Binds<A>>(&self) -> reference::ActorRef<R> {
2083 self.inner.cell.bind(self.inner.ports.as_ref())
2084 }
2085
2086 #[doc(hidden)]
2088 pub fn mailbox_for_py(&self) -> &Mailbox {
2089 &self.inner.mailbox
2090 }
2091
2092 pub fn proc(&self) -> &Proc {
2094 &self.inner.proc
2095 }
2096
2097 #[doc(hidden)]
2101 pub fn clone_for_py(&self) -> Self {
2102 Self {
2103 inner: Arc::clone(&self.inner),
2104 }
2105 }
2106
2107 fn actor_task_handle(&self) -> Option<&JoinHandle<()>> {
2109 self.inner.cell.inner.actor_task_handle.get()
2110 }
2111
2112 pub fn sequencer(&self) -> &Sequencer {
2114 &self.inner.sequencer
2115 }
2116
2117 pub fn instance_id(&self) -> Uuid {
2119 self.inner.id
2120 }
2121
2122 pub fn parent_handle<P: Actor>(&self) -> Option<ActorHandle<P>> {
2124 let parent_cell = self.inner.cell.inner.parent.upgrade()?;
2125 let ports = if let Ok(ports) = parent_cell.inner.ports.clone().downcast() {
2126 ports
2127 } else {
2128 return None;
2129 };
2130 Some(ActorHandle::new(parent_cell, ports))
2131 }
2132}
2133
2134impl<A: Actor> context::Mailbox for Instance<A> {
2135 fn mailbox(&self) -> &Mailbox {
2136 &self.inner.mailbox
2137 }
2138}
2139
2140impl<A: Actor> context::Mailbox for Context<'_, A> {
2141 fn mailbox(&self) -> &Mailbox {
2142 &self.instance.inner.mailbox
2143 }
2144}
2145
2146impl<A: Actor> context::Mailbox for &Instance<A> {
2147 fn mailbox(&self) -> &Mailbox {
2148 &self.inner.mailbox
2149 }
2150}
2151
2152impl<A: Actor> context::Mailbox for &Context<'_, A> {
2153 fn mailbox(&self) -> &Mailbox {
2154 &self.instance.inner.mailbox
2155 }
2156}
2157
2158impl<A: Actor> context::Actor for Instance<A> {
2159 type A = A;
2160 fn instance(&self) -> &Instance<A> {
2161 self
2162 }
2163}
2164
2165impl<A: Actor> context::Actor for Context<'_, A> {
2166 type A = A;
2167 fn instance(&self) -> &Instance<A> {
2168 self
2169 }
2170}
2171
2172impl<A: Actor> context::Actor for &Instance<A> {
2173 type A = A;
2174 fn instance(&self) -> &Instance<A> {
2175 self
2176 }
2177}
2178
2179impl<A: Actor> context::Actor for &Context<'_, A> {
2180 type A = A;
2181 fn instance(&self) -> &Instance<A> {
2182 self
2183 }
2184}
2185
2186impl Instance<()> {
2187 pub fn bind_actor_port<M: RemoteMessage>(&self) -> (PortHandle<M>, PortReceiver<M>) {
2189 assert!(
2190 self.actor_task_handle().is_none(),
2191 "can only bind actor port on instance with no running actor task"
2192 );
2193 self.inner.mailbox.bind_actor_port()
2194 }
2195}
2196
2197#[derive(Debug)]
2198enum ActorType {
2199 Named(&'static TypeInfo),
2200 Anonymous(&'static str),
2201}
2202
2203impl ActorType {
2204 fn type_name(&self) -> &str {
2205 match self {
2206 ActorType::Named(info) => info.typename(),
2207 ActorType::Anonymous(name) => name,
2208 }
2209 }
2210}
2211
2212#[derive(Clone)]
2218pub struct InstanceCell {
2219 inner: Arc<InstanceCellState>,
2220}
2221
2222impl fmt::Debug for InstanceCell {
2223 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2224 f.debug_struct("InstanceCell")
2225 .field("actor_id", &self.inner.actor_id)
2226 .field("actor_type", &self.inner.actor_type)
2227 .finish()
2228 }
2229}
2230
2231struct InstanceCellState {
2232 actor_id: reference::ActorId,
2234
2235 actor_type: ActorType,
2237
2238 proc: Proc,
2240
2241 actor_loop: Option<(PortHandle<Signal>, PortHandle<ActorSupervisionEvent>)>,
2243
2244 status: watch::Receiver<ActorStatus>,
2246
2247 parent: WeakInstanceCell,
2249
2250 children: DashMap<reference::Index, InstanceCell>,
2252
2253 actor_task_handle: OnceLock<JoinHandle<()>>,
2255
2256 exported_named_ports: DashMap<u64, &'static str>,
2258
2259 num_processed_messages: AtomicU64,
2261
2262 created_at: SystemTime,
2264
2265 last_message_handler: RwLock<Option<HandlerInfo>>,
2267
2268 total_processing_time_us: AtomicU64,
2270
2271 recording: Recording,
2274
2275 published_attrs: RwLock<Option<hyperactor_config::Attrs>>,
2284
2285 query_child_handler: RwLock<
2293 Option<Box<dyn (Fn(&crate::reference::Reference) -> IntrospectResult) + Send + Sync>>,
2294 >,
2295
2296 supervision_event: std::sync::Mutex<Option<crate::supervision::ActorSupervisionEvent>>,
2299
2300 is_system: AtomicBool,
2304
2305 ports: Arc<dyn Any + Send + Sync>,
2308}
2309
2310impl InstanceCellState {
2311 fn maybe_unlink_parent(&self) -> Option<InstanceCell> {
2314 self.parent
2315 .upgrade()
2316 .filter(|parent| parent.inner.unlink(self))
2317 }
2318
2319 fn unlink(&self, child: &InstanceCellState) -> bool {
2321 assert_eq!(self.actor_id.proc_id(), child.actor_id.proc_id());
2322 self.children.remove(&child.actor_id.pid()).is_some()
2323 }
2324}
2325
2326fn select_eviction_candidates(
2339 entries: &[(reference::ActorId, Option<String>)],
2340 excess: usize,
2341) -> Vec<reference::ActorId> {
2342 let mut clean: Vec<&reference::ActorId> = Vec::new();
2343 let mut failed: Vec<(&reference::ActorId, &str)> = Vec::new();
2344 for (id, occurred_at) in entries {
2345 match occurred_at {
2346 Some(ts) => failed.push((id, ts.as_str())),
2347 None => clean.push(id),
2348 }
2349 }
2350
2351 let mut to_remove: Vec<reference::ActorId> = Vec::new();
2352 let mut remaining = excess;
2353
2354 for id in clean {
2356 if remaining == 0 {
2357 break;
2358 }
2359 to_remove.push(id.clone());
2360 remaining -= 1;
2361 }
2362
2363 if remaining > 0 {
2365 failed.sort_by(|a, b| b.1.cmp(a.1));
2366 for (id, _) in failed.into_iter().take(remaining) {
2367 to_remove.push(id.clone());
2368 }
2369 }
2370
2371 to_remove
2372}
2373
2374impl InstanceCell {
2375 fn new(
2378 actor_id: reference::ActorId,
2379 actor_type: ActorType,
2380 proc: Proc,
2381 actor_loop: Option<(PortHandle<Signal>, PortHandle<ActorSupervisionEvent>)>,
2382 status: watch::Receiver<ActorStatus>,
2383 parent: Option<InstanceCell>,
2384 ports: Arc<dyn Any + Send + Sync>,
2385 ) -> Self {
2386 let _ais = actor_id.to_string();
2387 let cell = Self {
2388 inner: Arc::new(InstanceCellState {
2389 actor_id: actor_id.clone(),
2390 actor_type,
2391 proc: proc.clone(),
2392 actor_loop,
2393 status,
2394 parent: parent.map_or_else(WeakInstanceCell::new, |cell| cell.downgrade()),
2395 children: DashMap::new(),
2396 actor_task_handle: OnceLock::new(),
2397 exported_named_ports: DashMap::new(),
2398 num_processed_messages: AtomicU64::new(0),
2399 created_at: std::time::SystemTime::now(),
2400 last_message_handler: RwLock::new(None),
2401 total_processing_time_us: AtomicU64::new(0),
2402 recording: hyperactor_telemetry::recorder().record(64),
2403 published_attrs: RwLock::new(None),
2404 query_child_handler: RwLock::new(None),
2405 supervision_event: std::sync::Mutex::new(None),
2406 is_system: AtomicBool::new(false),
2407 ports,
2408 }),
2409 };
2410 cell.maybe_link_parent();
2411 proc.inner
2412 .instances
2413 .insert(actor_id.clone(), cell.downgrade());
2414 cell
2415 }
2416
2417 fn wrap(inner: Arc<InstanceCellState>) -> Self {
2418 Self { inner }
2419 }
2420
2421 pub fn actor_id(&self) -> &reference::ActorId {
2423 &self.inner.actor_id
2424 }
2425
2426 pub(crate) fn pid(&self) -> reference::Index {
2428 self.inner.actor_id.pid()
2429 }
2430
2431 #[allow(dead_code)]
2433 pub(crate) fn actor_task_handle(&self) -> Option<&JoinHandle<()>> {
2434 self.inner.actor_task_handle.get()
2435 }
2436
2437 pub fn status(&self) -> &watch::Receiver<ActorStatus> {
2439 &self.inner.status
2440 }
2441
2442 pub fn supervision_event(&self) -> Option<crate::supervision::ActorSupervisionEvent> {
2445 self.inner.supervision_event.lock().unwrap().clone()
2446 }
2447
2448 pub fn signal(&self, signal: Signal) -> Result<(), ActorError> {
2450 if let Some((signal_port, _)) = &self.inner.actor_loop {
2451 static CLIENT: OnceLock<(Instance<()>, ActorHandle<()>)> = OnceLock::new();
2453 let client = &CLIENT
2454 .get_or_init(|| Proc::runtime().instance("global_signal_client").unwrap())
2455 .0;
2456 signal_port.send(&client, signal).map_err(ActorError::from)
2457 } else {
2458 tracing::warn!(
2459 "{}: attempted to send signal {} to detached actor",
2460 self.inner.actor_id,
2461 signal
2462 );
2463 Ok(())
2464 }
2465 }
2466
2467 pub fn send_supervision_event_or_crash(
2476 &self,
2477 child_cx: &impl context::Actor, event: ActorSupervisionEvent,
2479 ) {
2480 match &self.inner.actor_loop {
2481 Some((_, supervision_port)) => {
2482 if let Err(err) = supervision_port.send(child_cx, event.clone()) {
2483 if !event.is_error() {
2484 tracing::debug!(
2490 "{}: dropping non-error supervision event {}: {:?}",
2491 self.actor_id(),
2492 event,
2493 err
2494 );
2495 return;
2496 }
2497 tracing::error!(
2498 "{}: failed to send supervision event to actor: {:?}. Crash the process.",
2499 self.actor_id(),
2500 err
2501 );
2502 std::process::exit(1);
2503 }
2504 }
2505 None => {
2506 if !event.is_error() {
2507 tracing::debug!(
2508 "{}: dropping non-error supervision event {} to detached actor",
2509 self.actor_id(),
2510 event,
2511 );
2512 return;
2513 }
2514 tracing::error!(
2515 "{}: failed: {}: cannot send supervision event to detached actor: crashing",
2516 self.actor_id(),
2517 event,
2518 );
2519 std::process::exit(1);
2520 }
2521 }
2522 }
2523
2524 pub fn downgrade(&self) -> WeakInstanceCell {
2526 WeakInstanceCell {
2527 inner: Arc::downgrade(&self.inner),
2528 }
2529 }
2530
2531 fn link(&self, child: InstanceCell) {
2533 assert_eq!(self.actor_id().proc_id(), child.actor_id().proc_id());
2534 self.inner.children.insert(child.pid(), child);
2535 }
2536
2537 fn unlink(&self, child: &InstanceCell) {
2539 assert_eq!(self.actor_id().proc_id(), child.actor_id().proc_id());
2540 self.inner.children.remove(&child.pid());
2541 }
2542
2543 fn unlink_all(&self) {
2545 self.inner.children.clear();
2546 }
2547
2548 fn maybe_link_parent(&self) {
2550 if let Some(parent) = self.inner.parent.upgrade() {
2551 parent.link(self.clone());
2552 }
2553 }
2554
2555 fn maybe_unlink_parent(&self) -> Option<InstanceCell> {
2558 self.inner.maybe_unlink_parent()
2559 }
2560
2561 fn child_iter(&self) -> impl Iterator<Item = RefMulti<'_, reference::Index, InstanceCell>> {
2564 self.inner.children.iter()
2565 }
2566
2567 pub fn child_count(&self) -> usize {
2569 self.inner.children.len()
2570 }
2571
2572 pub fn child_actor_ids(&self) -> Vec<reference::ActorId> {
2574 self.inner
2575 .children
2576 .iter()
2577 .map(|entry| entry.value().actor_id().clone())
2578 .collect()
2579 }
2580
2581 fn get_child(&self, pid: reference::Index) -> Option<InstanceCell> {
2583 self.inner.children.get(&pid).map(|child| child.clone())
2584 }
2585
2586 pub fn recording(&self) -> &Recording {
2588 &self.inner.recording
2589 }
2590
2591 pub fn created_at(&self) -> SystemTime {
2593 self.inner.created_at
2594 }
2595
2596 pub fn num_processed_messages(&self) -> u64 {
2598 self.inner.num_processed_messages.load(Ordering::SeqCst)
2599 }
2600
2601 pub fn last_message_handler(&self) -> Option<HandlerInfo> {
2603 self.inner.last_message_handler.read().unwrap().clone()
2604 }
2605
2606 pub fn total_processing_time_us(&self) -> u64 {
2608 self.inner.total_processing_time_us.load(Ordering::SeqCst)
2609 }
2610
2611 pub fn parent(&self) -> Option<InstanceCell> {
2613 self.inner.parent.upgrade()
2614 }
2615
2616 pub fn actor_type_name(&self) -> &str {
2618 self.inner.actor_type.type_name()
2619 }
2620
2621 pub fn set_published_attrs(&self, attrs: hyperactor_config::Attrs) {
2623 *self.inner.published_attrs.write().unwrap() = Some(attrs);
2624 }
2625
2626 pub fn merge_published_attr<T: hyperactor_config::AttrValue>(
2629 &self,
2630 key: hyperactor_config::Key<T>,
2631 value: T,
2632 ) {
2633 self.inner
2634 .published_attrs
2635 .write()
2636 .unwrap()
2637 .get_or_insert_with(hyperactor_config::Attrs::new)
2638 .set(key, value);
2639 }
2640
2641 pub fn published_attrs(&self) -> Option<hyperactor_config::Attrs> {
2643 self.inner.published_attrs.read().unwrap().clone()
2644 }
2645
2646 pub fn set_query_child_handler(
2654 &self,
2655 handler: impl (Fn(&crate::reference::Reference) -> IntrospectResult) + Send + Sync + 'static,
2656 ) {
2657 *self.inner.query_child_handler.write().unwrap() = Some(Box::new(handler));
2658 }
2659
2660 pub fn query_child(&self, child_ref: &crate::reference::Reference) -> Option<IntrospectResult> {
2662 let guard = self.inner.query_child_handler.read().unwrap();
2663 guard.as_ref().map(|handler| handler(child_ref))
2664 }
2665
2666 pub fn is_system(&self) -> bool {
2668 self.inner.is_system.load(Ordering::Relaxed)
2669 }
2670
2671 pub fn store_terminated_snapshot(&self, payload: crate::introspect::IntrospectResult) {
2681 let snapshots = &self.inner.proc.inner.terminated_snapshots;
2682 snapshots.insert(self.actor_id().clone(), payload);
2683 let max = hyperactor_config::global::get(crate::config::TERMINATED_SNAPSHOT_RETENTION);
2684 let excess = snapshots.len().saturating_sub(max);
2685 if excess > 0 {
2686 let entries: Vec<_> = snapshots
2688 .iter()
2689 .map(|entry| {
2690 let occurred_at =
2691 serde_json::from_str::<hyperactor_config::Attrs>(&entry.value().attrs)
2692 .ok()
2693 .and_then(|attrs| {
2694 attrs
2696 .get(crate::introspect::FAILURE_ERROR_MESSAGE)
2697 .cloned()?;
2698 attrs
2700 .get(crate::introspect::FAILURE_OCCURRED_AT)
2701 .map(|t| humantime::format_rfc3339(*t).to_string())
2702 });
2703 (entry.key().clone(), occurred_at)
2704 })
2705 .collect();
2706
2707 for key in select_eviction_candidates(&entries, excess) {
2708 snapshots.remove(&key);
2709 }
2710 }
2711 }
2712
2713 pub(crate) fn bind<A: Actor, R: Binds<A>>(&self, ports: &Ports<A>) -> reference::ActorRef<R> {
2716 <R as Binds<A>>::bind(ports);
2717 ports.bind::<Signal>();
2729 ports.bind::<Undeliverable<MessageEnvelope>>();
2730 for entry in ports.bound.iter() {
2732 self.inner
2733 .exported_named_ports
2734 .insert(*entry.key(), entry.value());
2735 }
2736 reference::ActorRef::attest(self.actor_id().clone())
2737 }
2738
2739 pub(crate) fn downcast_handle<A: Actor>(&self) -> Option<ActorHandle<A>> {
2741 let ports = Arc::clone(&self.inner.ports).downcast::<Ports<A>>().ok()?;
2742 Some(ActorHandle::new(self.clone(), ports))
2743 }
2744
2745 pub fn traverse<F>(&self, f: &mut F)
2749 where
2750 F: FnMut(&InstanceCell, usize),
2751 {
2752 self.traverse_inner(0, f);
2753 }
2754
2755 fn traverse_inner<F>(&self, depth: usize, f: &mut F)
2756 where
2757 F: FnMut(&InstanceCell, usize),
2758 {
2759 f(self, depth);
2760 let mut children: Vec<_> = self.child_iter().map(|r| r.value().clone()).collect();
2762 children.sort_by_key(|c| c.pid());
2763 for child in children {
2764 child.traverse_inner(depth + 1, f);
2765 }
2766 }
2767}
2768
2769impl Drop for InstanceCellState {
2770 fn drop(&mut self) {
2771 if let Some(parent) = self.maybe_unlink_parent() {
2772 tracing::debug!(
2773 "instance {} was dropped with parent {} still linked",
2774 self.actor_id,
2775 parent.actor_id()
2776 );
2777 }
2778 if self.proc.inner.instances.remove(&self.actor_id).is_none() {
2779 tracing::error!("instance {} was dropped but not in proc", self.actor_id);
2780 }
2781 }
2782}
2783
2784#[derive(Debug, Clone)]
2787pub struct WeakInstanceCell {
2788 inner: Weak<InstanceCellState>,
2789}
2790
2791impl Default for WeakInstanceCell {
2792 fn default() -> Self {
2793 Self::new()
2794 }
2795}
2796
2797impl WeakInstanceCell {
2798 pub fn new() -> Self {
2800 Self { inner: Weak::new() }
2801 }
2802
2803 pub fn upgrade(&self) -> Option<InstanceCell> {
2805 self.inner.upgrade().map(InstanceCell::wrap)
2806 }
2807}
2808
2809pub struct Ports<A: Actor> {
2814 ports: DashMap<TypeId, Box<dyn Any + Send + Sync + 'static>>,
2815 bound: DashMap<u64, &'static str>,
2816 mailbox: Mailbox,
2817 workq: OrderedSender<WorkCell<A>>,
2818}
2819
2820impl<A: Actor> Ports<A> {
2821 fn new(mailbox: Mailbox, workq: OrderedSender<WorkCell<A>>) -> Self {
2822 Self {
2823 ports: DashMap::new(),
2824 bound: DashMap::new(),
2825 mailbox,
2826 workq,
2827 }
2828 }
2829
2830 pub(crate) fn get<M: Message>(&self) -> PortHandle<M>
2832 where
2833 A: Handler<M>,
2834 {
2835 let key = TypeId::of::<M>();
2836 match self.ports.entry(key) {
2837 Entry::Vacant(entry) => {
2838 assert_ne!(
2840 key,
2841 TypeId::of::<Signal>(),
2842 "cannot provision Signal port through `Ports::get`"
2843 );
2844 assert_ne!(
2845 key,
2846 TypeId::of::<IntrospectMessage>(),
2847 "cannot provision IntrospectMessage port through `Ports::get`"
2848 );
2849
2850 let type_info = TypeInfo::get_by_typeid(key);
2851 let workq = self.workq.clone();
2852 let actor_id = self.mailbox.actor_id().to_string();
2853 let port = self.mailbox.open_enqueue_port(move |headers, msg: M| {
2854 let seq_info = headers.get(SEQ_INFO);
2855
2856 let work = WorkCell::new(move |actor: &mut A, instance: &Instance<A>| {
2857 Box::pin(async move {
2858 unsafe {
2860 instance
2861 .handle_message(actor, type_info, headers, msg)
2862 .await
2863 }
2864 })
2865 });
2866 ACTOR_MESSAGE_QUEUE_SIZE.add(
2867 1,
2868 hyperactor_telemetry::kv_pairs!("actor_id" => actor_id.clone()),
2869 );
2870 if workq.enable_buffering {
2871 match seq_info {
2872 Some(SeqInfo::Session { session_id, seq }) => {
2873 workq.send(session_id, seq, work).map_err(|e| match e {
2876 OrderedSenderError::InvalidZeroSeq(_) => {
2877 let error_msg = format!(
2878 "in enqueue func for {}, got seq 0 for message type {}",
2879 actor_id,
2880 std::any::type_name::<M>(),
2881 );
2882 tracing::error!(error_msg);
2883 anyhow::anyhow!(error_msg)
2884 }
2885 OrderedSenderError::SendError(e) => anyhow::Error::from(e),
2886 OrderedSenderError::FlushError(e) => e,
2887 })
2888 }
2889 Some(SeqInfo::Direct) => {
2890 workq.direct_send(work).map_err(anyhow::Error::from)
2891 }
2892 None => {
2893 let error_msg = format!(
2894 "in enqueue func for {}, buffering is enabled, but SEQ_INFO is not set for message type {}",
2895 actor_id,
2896 std::any::type_name::<M>(),
2897 );
2898 tracing::error!(error_msg);
2899 anyhow::bail!(error_msg);
2900 }
2901 }
2902 } else {
2903 workq.direct_send(work).map_err(anyhow::Error::from)
2904 }
2905 });
2906 entry.insert(Box::new(port.clone()));
2907 port
2908 }
2909 Entry::Occupied(entry) => {
2910 let port = entry.get();
2911 port.downcast_ref::<PortHandle<M>>().unwrap().clone()
2912 }
2913 }
2914 }
2915
2916 pub(crate) fn open_message_port<M: Message>(&self) -> Option<(PortHandle<M>, PortReceiver<M>)> {
2919 match self.ports.entry(TypeId::of::<M>()) {
2920 Entry::Vacant(entry) => {
2921 let (port, receiver) = self.mailbox.open_port();
2922 entry.insert(Box::new(port.clone()));
2923 Some((port, receiver))
2924 }
2925 Entry::Occupied(_) => None,
2926 }
2927 }
2928
2929 pub fn bind<M: RemoteMessage>(&self)
2931 where
2932 A: Handler<M>,
2933 {
2934 let port_index = M::port();
2935 match self.bound.entry(port_index) {
2936 Entry::Vacant(entry) => {
2937 self.get::<M>().bind_actor_port();
2938 entry.insert(M::typename());
2939 }
2940 Entry::Occupied(entry) => {
2941 assert_eq!(
2942 *entry.get(),
2943 M::typename(),
2944 "bind {}: port index {} already bound to type {}",
2945 M::typename(),
2946 port_index,
2947 entry.get(),
2948 );
2949 }
2950 }
2951 }
2952}
2953
2954#[cfg(test)]
2955mod tests {
2956 use std::assert_matches::assert_matches;
2957 use std::sync::atomic::AtomicBool;
2958
2959 use hyperactor_macros::export;
2960 use serde_json::json;
2961 use timed_test::async_timed_test;
2962 use tokio::sync::Barrier;
2963 use tokio::sync::oneshot;
2964 use tracing::Level;
2965 use tracing_subscriber::layer::SubscriberExt;
2966 use tracing_test::internal::logs_with_scope_contain;
2967
2968 use super::*;
2969 use crate as hyperactor;
2971 use crate::HandleClient;
2972 use crate::Handler;
2973 use crate::testing::proc_supervison::ProcSupervisionCoordinator;
2974 use crate::testing::process_assertion::assert_termination;
2975
2976 #[derive(Debug, Default)]
2977 #[export]
2978 struct TestActor;
2979
2980 impl Actor for TestActor {}
2981
2982 #[derive(Handler, HandleClient, Debug)]
2983 enum TestActorMessage {
2984 Reply(oneshot::Sender<()>),
2985 Wait(oneshot::Sender<()>, oneshot::Receiver<()>),
2986 Forward(ActorHandle<TestActor>, Box<TestActorMessage>),
2987 Noop(),
2988 Fail(anyhow::Error),
2989 Panic(String),
2990 Spawn(oneshot::Sender<ActorHandle<TestActor>>),
2991 }
2992
2993 impl TestActor {
2994 async fn spawn_child(
2995 cx: &impl context::Actor,
2996 parent: &ActorHandle<TestActor>,
2997 ) -> ActorHandle<TestActor> {
2998 let (tx, rx) = oneshot::channel();
2999 parent.send(cx, TestActorMessage::Spawn(tx)).unwrap();
3000 rx.await.unwrap()
3001 }
3002 }
3003
3004 #[async_trait]
3005 #[crate::handle(TestActorMessage)]
3006 impl TestActorMessageHandler for TestActor {
3007 async fn reply(
3008 &mut self,
3009 _cx: &crate::Context<Self>,
3010 sender: oneshot::Sender<()>,
3011 ) -> Result<(), anyhow::Error> {
3012 sender.send(()).unwrap();
3013 Ok(())
3014 }
3015
3016 async fn wait(
3017 &mut self,
3018 _cx: &crate::Context<Self>,
3019 sender: oneshot::Sender<()>,
3020 receiver: oneshot::Receiver<()>,
3021 ) -> Result<(), anyhow::Error> {
3022 sender.send(()).unwrap();
3023 receiver.await.unwrap();
3024 Ok(())
3025 }
3026
3027 async fn forward(
3028 &mut self,
3029 cx: &crate::Context<Self>,
3030 destination: ActorHandle<TestActor>,
3031 message: Box<TestActorMessage>,
3032 ) -> Result<(), anyhow::Error> {
3033 destination.send(cx, *message)?;
3035 Ok(())
3036 }
3037
3038 async fn noop(&mut self, _cx: &crate::Context<Self>) -> Result<(), anyhow::Error> {
3039 Ok(())
3040 }
3041
3042 async fn fail(
3043 &mut self,
3044 _cx: &crate::Context<Self>,
3045 err: anyhow::Error,
3046 ) -> Result<(), anyhow::Error> {
3047 Err(err)
3048 }
3049
3050 async fn panic(
3051 &mut self,
3052 _cx: &crate::Context<Self>,
3053 err_msg: String,
3054 ) -> Result<(), anyhow::Error> {
3055 panic!("{}", err_msg);
3056 }
3057
3058 async fn spawn(
3059 &mut self,
3060 cx: &crate::Context<Self>,
3061 reply: oneshot::Sender<ActorHandle<TestActor>>,
3062 ) -> Result<(), anyhow::Error> {
3063 let handle = TestActor.spawn(cx)?;
3064 reply.send(handle).unwrap();
3065 Ok(())
3066 }
3067 }
3068
3069 #[tracing_test::traced_test]
3070 #[async_timed_test(timeout_secs = 30)]
3071 async fn test_spawn_actor() {
3072 let proc = Proc::local();
3073 let (client, _) = proc.instance("client").unwrap();
3074 let handle = proc.spawn("test", TestActor).unwrap();
3075
3076 assert!(logs_contain(
3078 format!(
3079 "{}: spawned with {:?}",
3080 handle.actor_id(),
3081 handle.cell().actor_task_handle().unwrap(),
3082 )
3083 .as_str()
3084 ));
3085
3086 let mut state = handle.status().clone();
3087
3088 let (tx, rx) = oneshot::channel::<()>();
3091 handle.send(&client, TestActorMessage::Reply(tx)).unwrap();
3092 rx.await.unwrap();
3093
3094 state
3095 .wait_for(|state: &ActorStatus| matches!(*state, ActorStatus::Idle))
3096 .await
3097 .unwrap();
3098
3099 let (enter_tx, enter_rx) = oneshot::channel::<()>();
3101 let (exit_tx, exit_rx) = oneshot::channel::<()>();
3102
3103 handle
3104 .send(&client, TestActorMessage::Wait(enter_tx, exit_rx))
3105 .unwrap();
3106 enter_rx.await.unwrap();
3107 assert_matches!(*state.borrow(), ActorStatus::Processing(instant, _) if instant <= std::time::SystemTime::now());
3108 exit_tx.send(()).unwrap();
3109
3110 state
3111 .wait_for(|state| matches!(*state, ActorStatus::Idle))
3112 .await
3113 .unwrap();
3114
3115 handle.drain_and_stop("test").unwrap();
3116 handle.await;
3117 assert_matches!(&*state.borrow(), ActorStatus::Stopped(reason) if reason == "test");
3118 }
3119
3120 #[async_timed_test(timeout_secs = 30)]
3121 async fn test_proc_actors_messaging() {
3122 let proc = Proc::local();
3123 let (client, _) = proc.instance("client").unwrap();
3124 let first = proc.spawn::<TestActor>("first", TestActor).unwrap();
3125 let second = proc.spawn::<TestActor>("second", TestActor).unwrap();
3126 let (tx, rx) = oneshot::channel::<()>();
3127 let reply_message = TestActorMessage::Reply(tx);
3128 first
3129 .send(
3130 &client,
3131 TestActorMessage::Forward(second, Box::new(reply_message)),
3132 )
3133 .unwrap();
3134 rx.await.unwrap();
3135 }
3136
3137 #[derive(Debug, Default)]
3138 #[export]
3139 struct LookupTestActor;
3140
3141 impl Actor for LookupTestActor {}
3142
3143 #[derive(Handler, HandleClient, Debug)]
3144 enum LookupTestMessage {
3145 ActorExists(
3146 reference::ActorRef<TestActor>,
3147 #[reply] reference::OncePortRef<bool>,
3148 ),
3149 }
3150
3151 #[async_trait]
3152 #[crate::handle(LookupTestMessage)]
3153 impl LookupTestMessageHandler for LookupTestActor {
3154 async fn actor_exists(
3155 &mut self,
3156 cx: &crate::Context<Self>,
3157 actor_ref: reference::ActorRef<TestActor>,
3158 ) -> Result<bool, anyhow::Error> {
3159 Ok(actor_ref.downcast_handle(cx).is_some())
3160 }
3161 }
3162
3163 #[async_timed_test(timeout_secs = 30)]
3164 async fn test_actor_lookup() {
3165 let proc = Proc::local();
3166 let (client, _handle) = proc.instance("client").unwrap();
3167
3168 let target_actor = proc.spawn::<TestActor>("target", TestActor).unwrap();
3169 let target_actor_ref = target_actor.bind();
3170 let lookup_actor = proc
3171 .spawn::<LookupTestActor>("lookup", LookupTestActor)
3172 .unwrap();
3173
3174 assert!(
3175 lookup_actor
3176 .actor_exists(&client, target_actor_ref.clone())
3177 .await
3178 .unwrap()
3179 );
3180
3181 assert!(
3183 !lookup_actor
3184 .actor_exists(
3185 &client,
3186 reference::ActorRef::attest(target_actor.actor_id().child_id(123).clone())
3187 )
3188 .await
3189 .unwrap()
3190 );
3191 assert!(
3193 !lookup_actor
3194 .actor_exists(
3195 &client,
3196 reference::ActorRef::attest(lookup_actor.actor_id().clone())
3197 )
3198 .await
3199 .unwrap()
3200 );
3201
3202 target_actor.drain_and_stop("test").unwrap();
3203 target_actor.await;
3204
3205 assert!(
3206 !lookup_actor
3207 .actor_exists(&client, target_actor_ref)
3208 .await
3209 .unwrap()
3210 );
3211
3212 lookup_actor.drain_and_stop("test").unwrap();
3213 lookup_actor.await;
3214 }
3215
3216 fn validate_link(child: &InstanceCell, parent: &InstanceCell) {
3217 assert_eq!(child.actor_id().proc_id(), parent.actor_id().proc_id());
3218 assert_eq!(
3219 child.inner.parent.upgrade().unwrap().actor_id(),
3220 parent.actor_id()
3221 );
3222 assert_matches!(
3223 parent.inner.children.get(&child.pid()),
3224 Some(node) if node.actor_id() == child.actor_id()
3225 );
3226 }
3227
3228 #[tracing_test::traced_test]
3229 #[async_timed_test(timeout_secs = 30)]
3230 async fn test_spawn_child() {
3231 let proc = Proc::local();
3232 let (client, _) = proc.instance("client").unwrap();
3233
3234 let first = proc.spawn::<TestActor>("first", TestActor).unwrap();
3235 let second = TestActor::spawn_child(&client, &first).await;
3236 let third = TestActor::spawn_child(&client, &second).await;
3237
3238 assert!(logs_with_scope_contain(
3240 "hyperactor::proc",
3241 format!(
3242 "{}: spawned with {:?}",
3243 first.actor_id(),
3244 first.cell().actor_task_handle().unwrap()
3245 )
3246 .as_str()
3247 ));
3248 assert!(logs_with_scope_contain(
3249 "hyperactor::proc",
3250 format!(
3251 "{}: spawned with {:?}",
3252 second.actor_id(),
3253 second.cell().actor_task_handle().unwrap()
3254 )
3255 .as_str()
3256 ));
3257 assert!(logs_with_scope_contain(
3258 "hyperactor::proc",
3259 format!(
3260 "{}: spawned with {:?}",
3261 third.actor_id(),
3262 third.cell().actor_task_handle().unwrap()
3263 )
3264 .as_str()
3265 ));
3266
3267 assert_eq!(first.actor_id().proc_id(), proc.proc_id());
3269 assert_eq!(second.actor_id(), &first.actor_id().child_id(1));
3270 assert_eq!(third.actor_id(), &first.actor_id().child_id(2));
3271
3272 validate_link(third.cell(), second.cell());
3274 validate_link(second.cell(), first.cell());
3275 assert!(first.cell().inner.parent.upgrade().is_none());
3276
3277 let third_cell = third.cell().clone();
3280 third.drain_and_stop("test").unwrap();
3281 third.await;
3282 assert!(third_cell.inner.children.is_empty());
3283 drop(third_cell);
3284 validate_link(second.cell(), first.cell());
3285
3286 let second_cell = second.cell().clone();
3287 second.drain_and_stop("test").unwrap();
3288 second.await;
3289 assert!(second_cell.inner.children.is_empty());
3290 drop(second_cell);
3291
3292 let first_cell = first.cell().clone();
3293 first.drain_and_stop("test").unwrap();
3294 first.await;
3295 assert!(first_cell.inner.children.is_empty());
3296 }
3297
3298 #[async_timed_test(timeout_secs = 30)]
3299 async fn test_child_lifecycle() {
3300 let proc = Proc::local();
3301 let (client, _) = proc.instance("client").unwrap();
3302
3303 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
3304 let root_1 = TestActor::spawn_child(&client, &root).await;
3305 let root_2 = TestActor::spawn_child(&client, &root).await;
3306 let root_2_1 = TestActor::spawn_child(&client, &root_2).await;
3307
3308 root.drain_and_stop("test").unwrap();
3309 root.await;
3310
3311 for actor in [root_1, root_2, root_2_1] {
3312 assert!(actor.send(&client, TestActorMessage::Noop()).is_err());
3313 assert_matches!(actor.await, ActorStatus::Stopped(reason) if reason == "parent stopping");
3314 }
3315 }
3316
3317 #[async_timed_test(timeout_secs = 30)]
3318 async fn test_parent_failure() {
3319 let proc = Proc::local();
3320 let (client, _) = proc.instance("client").unwrap();
3321 let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
3324
3325 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
3326 let root_1 = TestActor::spawn_child(&client, &root).await;
3327 let root_2 = TestActor::spawn_child(&client, &root).await;
3328 let root_2_1 = TestActor::spawn_child(&client, &root_2).await;
3329
3330 root_2
3331 .send(
3332 &client,
3333 TestActorMessage::Fail(anyhow::anyhow!("some random failure")),
3334 )
3335 .unwrap();
3336 let _root_2_actor_id = root_2.actor_id().clone();
3337 assert_matches!(
3338 root_2.await,
3339 ActorStatus::Failed(err) if err.to_string() == "some random failure"
3340 );
3341
3342 assert_matches!(
3346 root.await,
3347 ActorStatus::Failed(err) if err.to_string().contains("some random failure")
3348 );
3349 assert_matches!(root_2_1.await, ActorStatus::Stopped(_));
3350 assert_matches!(root_1.await, ActorStatus::Stopped(_));
3351 }
3352
3353 #[async_timed_test(timeout_secs = 30)]
3354 async fn test_multi_handler() {
3355 #[derive(Debug)]
3359 struct TestActor(Arc<AtomicUsize>);
3360
3361 #[async_trait]
3362 impl Actor for TestActor {}
3363
3364 #[async_trait]
3365 impl Handler<OncePortHandle<PortHandle<usize>>> for TestActor {
3366 async fn handle(
3367 &mut self,
3368 cx: &crate::Context<Self>,
3369 message: OncePortHandle<PortHandle<usize>>,
3370 ) -> anyhow::Result<()> {
3371 message.send(cx, cx.port())?;
3372 Ok(())
3373 }
3374 }
3375
3376 #[async_trait]
3377 impl Handler<usize> for TestActor {
3378 async fn handle(
3379 &mut self,
3380 _cx: &crate::Context<Self>,
3381 message: usize,
3382 ) -> anyhow::Result<()> {
3383 self.0.fetch_add(message, Ordering::SeqCst);
3384 Ok(())
3385 }
3386 }
3387
3388 let proc = Proc::local();
3389 let state = Arc::new(AtomicUsize::new(0));
3390 let actor = TestActor(state.clone());
3391 let handle = proc.spawn::<TestActor>("test", actor).unwrap();
3392 let (client, _) = proc.instance("client").unwrap();
3393 let (tx, rx) = client.open_once_port();
3394 handle.send(&client, tx).unwrap();
3395 let usize_handle = rx.recv().await.unwrap();
3396 usize_handle.send(&client, 123).unwrap();
3397
3398 handle.drain_and_stop("test").unwrap();
3399 handle.await;
3400
3401 assert_eq!(state.load(Ordering::SeqCst), 123);
3402 }
3403
3404 #[async_timed_test(timeout_secs = 30)]
3405 async fn test_actor_panic() {
3406 panic_handler::set_panic_hook();
3408
3409 let proc = Proc::local();
3410 let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
3413
3414 let (client, _handle) = proc.instance("client").unwrap();
3415 let actor_handle = proc.spawn("test", TestActor).unwrap();
3416 actor_handle
3417 .panic(&client, "some random failure".to_string())
3418 .await
3419 .unwrap();
3420 let actor_status = actor_handle.await;
3421
3422 assert_matches!(actor_status, ActorStatus::Failed(_));
3426 if let ActorStatus::Failed(err) = actor_status {
3427 let error_msg = err.to_string();
3428 assert!(error_msg.contains("some random failure"));
3430 assert!(error_msg.contains("library/std/src/panicking.rs"));
3434 }
3435 }
3436
3437 #[async_timed_test(timeout_secs = 30)]
3438 async fn test_local_supervision_propagation() {
3439 hyperactor_telemetry::initialize_logging_for_test();
3440
3441 #[derive(Debug)]
3442 struct TestActor(Arc<AtomicBool>, bool);
3443
3444 #[async_trait]
3445 impl Actor for TestActor {
3446 async fn handle_supervision_event(
3447 &mut self,
3448 _this: &Instance<Self>,
3449 _event: &ActorSupervisionEvent,
3450 ) -> Result<bool, anyhow::Error> {
3451 if !self.1 {
3452 return Ok(false);
3453 }
3454
3455 tracing::error!(
3456 "{}: supervision event received: {:?}",
3457 _this.self_id(),
3458 _event
3459 );
3460 self.0.store(true, Ordering::SeqCst);
3461 Ok(true)
3462 }
3463 }
3464
3465 #[async_trait]
3466 impl Handler<String> for TestActor {
3467 async fn handle(
3468 &mut self,
3469 cx: &crate::Context<Self>,
3470 message: String,
3471 ) -> anyhow::Result<()> {
3472 tracing::info!("{} received message: {}", cx.self_id(), message);
3473 Err(anyhow::anyhow!(message))
3474 }
3475 }
3476
3477 let proc = Proc::local();
3478 let (client, _) = proc.instance("client").unwrap();
3479 let (reported_event, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
3480
3481 let root_state = Arc::new(AtomicBool::new(false));
3482 let root_1_state = Arc::new(AtomicBool::new(false));
3483 let root_1_1_state = Arc::new(AtomicBool::new(false));
3484 let root_1_1_1_state = Arc::new(AtomicBool::new(false));
3485 let root_2_state = Arc::new(AtomicBool::new(false));
3486 let root_2_1_state = Arc::new(AtomicBool::new(false));
3487
3488 let root = proc
3489 .spawn::<TestActor>("root", TestActor(root_state.clone(), false))
3490 .unwrap();
3491 let root_1 = proc
3492 .spawn_child::<TestActor>(
3493 root.cell().clone(),
3494 TestActor(
3495 root_1_state.clone(),
3496 true, ),
3498 )
3499 .unwrap();
3500 let root_1_1 = proc
3501 .spawn_child::<TestActor>(
3502 root_1.cell().clone(),
3503 TestActor(root_1_1_state.clone(), false),
3504 )
3505 .unwrap();
3506 let root_1_1_1 = proc
3507 .spawn_child::<TestActor>(
3508 root_1_1.cell().clone(),
3509 TestActor(root_1_1_1_state.clone(), false),
3510 )
3511 .unwrap();
3512 let root_2 = proc
3513 .spawn_child::<TestActor>(root.cell().clone(), TestActor(root_2_state.clone(), false))
3514 .unwrap();
3515 let root_2_1 = proc
3516 .spawn_child::<TestActor>(
3517 root_2.cell().clone(),
3518 TestActor(root_2_1_state.clone(), false),
3519 )
3520 .unwrap();
3521
3522 root_1_1_1
3525 .send::<String>(&client, "some random failure".into())
3526 .unwrap();
3527
3528 root_2_1
3531 .send::<String>(&client, "some random failure".into())
3532 .unwrap();
3533
3534 tokio::time::sleep(Duration::from_secs(1)).await;
3535
3536 assert!(!root_state.load(Ordering::SeqCst));
3537 assert!(root_1_state.load(Ordering::SeqCst));
3538 assert!(!root_1_1_state.load(Ordering::SeqCst));
3539 assert!(!root_1_1_1_state.load(Ordering::SeqCst));
3540 assert!(!root_2_state.load(Ordering::SeqCst));
3541 assert!(!root_2_1_state.load(Ordering::SeqCst));
3542 assert_eq!(
3543 reported_event.event().map(|e| e.actor_id.clone()),
3544 Some(root_2_1.actor_id().clone())
3545 );
3546 }
3547
3548 #[async_timed_test(timeout_secs = 30)]
3549 async fn test_instance() {
3550 #[derive(Debug, Default)]
3551 struct TestActor;
3552
3553 impl Actor for TestActor {}
3554
3555 #[async_trait]
3556 impl Handler<(String, reference::PortRef<String>)> for TestActor {
3557 async fn handle(
3558 &mut self,
3559 cx: &crate::Context<Self>,
3560 (message, port): (String, reference::PortRef<String>),
3561 ) -> anyhow::Result<()> {
3562 port.send(cx, message)?;
3563 Ok(())
3564 }
3565 }
3566
3567 let proc = Proc::local();
3568
3569 let (instance, handle) = proc.instance("my_test_actor").unwrap();
3570
3571 let child_actor = TestActor.spawn(&instance).unwrap();
3572
3573 let (port, mut receiver) = instance.open_port();
3574 child_actor
3575 .send(&instance, ("hello".to_string(), port.bind()))
3576 .unwrap();
3577
3578 let message = receiver.recv().await.unwrap();
3579 assert_eq!(message, "hello");
3580
3581 child_actor.drain_and_stop("test").unwrap();
3582 child_actor.await;
3583
3584 assert_eq!(*handle.status().borrow(), ActorStatus::Client);
3585 drop(instance);
3586 assert_matches!(*handle.status().borrow(), ActorStatus::Stopped(_));
3587 handle.await;
3588 }
3589
3590 #[tokio::test]
3591 async fn test_proc_terminate_without_coordinator() {
3592 if std::env::var("CARGO_TEST").is_ok() {
3593 eprintln!("test skipped as it hangs when run by cargo in sandcastle");
3594 return;
3595 }
3596
3597 let process = async {
3598 let proc = Proc::local();
3599 let root = proc.spawn("root", TestActor).unwrap();
3603 let (client, _handle) = proc.instance("client").unwrap();
3604 root.fail(&client, anyhow::anyhow!("some random failure"))
3605 .await
3606 .unwrap();
3607 tokio::time::sleep(Duration::from_secs(30)).await;
3611 };
3612
3613 assert_termination(|| process, 1).await.unwrap();
3614 }
3615
3616 fn trace_and_block(fut: impl Future) {
3617 tracing::subscriber::with_default(
3618 tracing_subscriber::Registry::default().with(hyperactor_telemetry::recorder().layer()),
3619 || {
3620 tokio::runtime::Builder::new_current_thread()
3621 .enable_all()
3622 .build()
3623 .unwrap()
3624 .block_on(fut)
3625 },
3626 );
3627 }
3628
3629 #[ignore = "until trace recording is turned back on"]
3630 #[test]
3631 fn test_handler_logging() {
3632 #[derive(Debug, Default)]
3633 struct LoggingActor;
3634
3635 impl Actor for LoggingActor {}
3636
3637 impl LoggingActor {
3638 async fn wait(cx: &impl context::Actor, handle: &ActorHandle<Self>) {
3639 let barrier = Arc::new(Barrier::new(2));
3640 handle.send(cx, barrier.clone()).unwrap();
3641 barrier.wait().await;
3642 }
3643 }
3644
3645 #[async_trait]
3646 impl Handler<String> for LoggingActor {
3647 async fn handle(
3648 &mut self,
3649 _cx: &crate::Context<Self>,
3650 message: String,
3651 ) -> anyhow::Result<()> {
3652 tracing::info!("{}", message);
3653 Ok(())
3654 }
3655 }
3656
3657 #[async_trait]
3658 impl Handler<u64> for LoggingActor {
3659 async fn handle(
3660 &mut self,
3661 _cx: &crate::Context<Self>,
3662 message: u64,
3663 ) -> anyhow::Result<()> {
3664 tracing::event!(Level::INFO, number = message);
3665 Ok(())
3666 }
3667 }
3668
3669 #[async_trait]
3670 impl Handler<Arc<Barrier>> for LoggingActor {
3671 async fn handle(
3672 &mut self,
3673 _cx: &crate::Context<Self>,
3674 message: Arc<Barrier>,
3675 ) -> anyhow::Result<()> {
3676 message.wait().await;
3677 Ok(())
3678 }
3679 }
3680
3681 #[async_trait]
3682 impl Handler<Arc<(Barrier, Barrier)>> for LoggingActor {
3683 async fn handle(
3684 &mut self,
3685 _cx: &crate::Context<Self>,
3686 barriers: Arc<(Barrier, Barrier)>,
3687 ) -> anyhow::Result<()> {
3688 let inner = tracing::span!(Level::INFO, "child_span");
3689 let _inner_guard = inner.enter();
3690 barriers.0.wait().await;
3691 barriers.1.wait().await;
3692 Ok(())
3693 }
3694 }
3695
3696 trace_and_block(async {
3697 let proc = Proc::local();
3698 let (client, _) = proc.instance("client").unwrap();
3699 let handle = LoggingActor.spawn_detached().unwrap();
3700 handle.send(&client, "hello world".to_string()).unwrap();
3701 handle
3702 .send(&client, "hello world again".to_string())
3703 .unwrap();
3704 handle.send(&client, 123u64).unwrap();
3705
3706 LoggingActor::wait(&client, &handle).await;
3707
3708 let events = handle.cell().inner.recording.tail();
3709 assert_eq!(events.len(), 3);
3710 assert_eq!(events[0].json_value(), json!({ "message": "hello world" }));
3711 assert_eq!(
3712 events[1].json_value(),
3713 json!({ "message": "hello world again" })
3714 );
3715 assert_eq!(events[2].json_value(), json!({ "number": 123 }));
3716
3717 let stacks = {
3718 let barriers = Arc::new((Barrier::new(2), Barrier::new(2)));
3719 handle.send(&client, Arc::clone(&barriers)).unwrap();
3720 barriers.0.wait().await;
3721 let stacks = handle.cell().inner.recording.stacks();
3722 barriers.1.wait().await;
3723 stacks
3724 };
3725 assert_eq!(stacks.len(), 1);
3726 assert_eq!(stacks[0].len(), 1);
3727 assert_eq!(stacks[0][0].name(), "child_span");
3728 })
3729 }
3730
3731 #[async_timed_test(timeout_secs = 30)]
3732 async fn test_mailbox_closed_with_owner_stopped_reason() {
3733 use crate::actor::ActorStatus;
3734 use crate::mailbox::MailboxErrorKind;
3735 use crate::mailbox::MailboxSenderErrorKind;
3736
3737 let proc = Proc::local();
3738 let (client, _) = proc.instance("client").unwrap();
3739 let actor_handle = proc.spawn("test", TestActor).unwrap();
3740
3741 let handle_for_send = actor_handle.clone();
3743
3744 actor_handle.drain_and_stop("healthy shutdown").unwrap();
3746 actor_handle.await;
3747
3748 let result = handle_for_send.send(&client, TestActorMessage::Noop());
3750
3751 assert!(result.is_err(), "send should fail when actor is stopped");
3752 let err = result.unwrap_err();
3753 assert_matches!(
3754 err.kind(),
3755 MailboxSenderErrorKind::Mailbox(mailbox_err)
3756 if matches!(
3757 mailbox_err.kind(),
3758 MailboxErrorKind::OwnerTerminated(ActorStatus::Stopped(reason)) if reason == "healthy shutdown"
3759 )
3760 );
3761 }
3762
3763 #[async_timed_test(timeout_secs = 30)]
3764 async fn test_mailbox_closed_with_owner_failed_reason() {
3765 use crate::actor::ActorErrorKind;
3766 use crate::actor::ActorStatus;
3767 use crate::mailbox::MailboxErrorKind;
3768 use crate::mailbox::MailboxSenderErrorKind;
3769
3770 let proc = Proc::local();
3771 let (client, _) = proc.instance("client").unwrap();
3772 let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
3775
3776 let actor_handle = proc.spawn("test", TestActor).unwrap();
3777
3778 let handle_for_send = actor_handle.clone();
3780
3781 actor_handle
3783 .send(
3784 &client,
3785 TestActorMessage::Fail(anyhow::anyhow!("intentional failure")),
3786 )
3787 .unwrap();
3788 actor_handle.await;
3789
3790 let result = handle_for_send.send(&client, TestActorMessage::Noop());
3792
3793 assert!(result.is_err(), "send should fail when actor has failed");
3794 let err = result.unwrap_err();
3795 assert_matches!(
3796 err.kind(),
3797 MailboxSenderErrorKind::Mailbox(mailbox_err)
3798 if matches!(
3799 mailbox_err.kind(),
3800 MailboxErrorKind::OwnerTerminated(ActorStatus::Failed(ActorErrorKind::Generic(msg)))
3801 if msg.contains("intentional failure")
3802 )
3803 );
3804 }
3805
3806 async fn wait_for_terminated_snapshot(
3810 proc: &Proc,
3811 actor_id: &reference::ActorId,
3812 ) -> crate::introspect::IntrospectResult {
3813 for i in 0..1000 {
3817 if let Some(snapshot) = proc.terminated_snapshot(actor_id) {
3818 return snapshot;
3819 }
3820 if i < 50 {
3821 tokio::task::yield_now().await;
3822 } else {
3823 tokio::time::sleep(Duration::from_millis(50)).await;
3824 }
3825 }
3826 panic!("timed out waiting for terminated snapshot for {}", actor_id);
3827 }
3828
3829 #[async_timed_test(timeout_secs = 30)]
3838 async fn test_terminated_snapshot_stored_on_stop() {
3839 let proc = Proc::local();
3840 let (_client, _client_handle) = proc.instance("client").unwrap();
3841
3842 let handle = proc.spawn::<TestActor>("actor", TestActor).unwrap();
3843 let actor_id = handle.actor_id().clone();
3844
3845 assert!(proc.terminated_snapshot(&actor_id).is_none());
3847 assert!(!proc.all_terminated_actor_ids().contains(&actor_id));
3848
3849 handle.drain_and_stop("test").unwrap();
3851 handle.await;
3852
3853 let snapshot = wait_for_terminated_snapshot(&proc, &actor_id).await;
3856 let attrs: hyperactor_config::Attrs =
3857 serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
3858 let status = attrs
3859 .get(crate::introspect::STATUS)
3860 .expect("must have status");
3861 assert!(
3862 status.starts_with("stopped"),
3863 "expected stopped status, got: {}",
3864 status
3865 );
3866
3867 assert!(proc.all_terminated_actor_ids().contains(&actor_id));
3869 assert!(
3870 !proc.all_actor_ids().contains(&actor_id),
3871 "stopped actor should not appear in live actor IDs"
3872 );
3873 }
3874
3875 #[async_timed_test(timeout_secs = 30)]
3882 async fn test_terminated_snapshot_stored_on_failure() {
3883 let proc = Proc::local();
3884 let (client, _client_handle) = proc.instance("client").unwrap();
3885 ProcSupervisionCoordinator::set(&proc).await.unwrap();
3887
3888 let handle = proc.spawn::<TestActor>("fail_actor", TestActor).unwrap();
3889 let actor_id = handle.actor_id().clone();
3890
3891 handle
3893 .send(&client, TestActorMessage::Fail(anyhow::anyhow!("boom")))
3894 .unwrap();
3895 handle.await;
3896
3897 let snapshot = wait_for_terminated_snapshot(&proc, &actor_id).await;
3898 let attrs: hyperactor_config::Attrs =
3899 serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
3900 let status = attrs
3901 .get(crate::introspect::STATUS)
3902 .expect("must have status");
3903 assert!(
3904 status.starts_with("failed"),
3905 "expected failed status, got: {}",
3906 status
3907 );
3908 }
3909
3910 #[async_timed_test(timeout_secs = 30)]
3912 async fn test_supervision_event_stored_on_failure() {
3913 let proc = Proc::local();
3914 let (client, _client_handle) = proc.instance("client").unwrap();
3915 ProcSupervisionCoordinator::set(&proc).await.unwrap();
3916
3917 let handle = proc.spawn::<TestActor>("fail_actor", TestActor).unwrap();
3918 let actor_id = handle.actor_id().clone();
3919 let cell = handle.cell().clone();
3920
3921 handle
3922 .send(&client, TestActorMessage::Fail(anyhow::anyhow!("boom")))
3923 .unwrap();
3924 handle.await;
3925
3926 let event = cell.supervision_event();
3927 assert!(event.is_some(), "failed actor must have supervision_event");
3928 let event = event.unwrap();
3929 assert_eq!(event.actor_id, actor_id);
3930 assert!(event.actor_status.is_failed());
3931 assert_eq!(event.actually_failing_actor().unwrap().actor_id, actor_id);
3933 }
3934
3935 #[async_timed_test(timeout_secs = 30)]
3937 async fn test_supervision_event_on_clean_stop() {
3938 let proc = Proc::local();
3939 let (_client, _client_handle) = proc.instance("client").unwrap();
3940
3941 let handle = proc.spawn::<TestActor>("stop_actor", TestActor).unwrap();
3942 let cell = handle.cell().clone();
3943
3944 handle.drain_and_stop("test").unwrap();
3945 handle.await;
3946
3947 let event = cell
3948 .supervision_event()
3949 .expect("cleanly stopped actor must have a supervision_event");
3950 assert!(
3951 matches!(event.actor_status, ActorStatus::Stopped(_)),
3952 "expected Stopped status, got {:?}",
3953 event.actor_status
3954 );
3955 assert!(!event.is_error());
3956 }
3957
3958 #[async_timed_test(timeout_secs = 30)]
3959 async fn test_supervision_coordinator_receives_clean_stop() {
3960 let proc = Proc::local();
3961 let (_client, _client_handle) = proc.instance("client").unwrap();
3962 let (mut reported_event, _coordinator_handle) =
3963 ProcSupervisionCoordinator::set(&proc).await.unwrap();
3964
3965 let handle = proc.spawn::<TestActor>("stop_actor", TestActor).unwrap();
3966 let actor_id = handle.actor_id().clone();
3967
3968 handle.drain_and_stop("test").unwrap();
3969 handle.await;
3970
3971 let event = reported_event.recv().await;
3972 assert_eq!(event.actor_id, actor_id);
3973 assert!(
3974 matches!(event.actor_status, ActorStatus::Stopped(_)),
3975 "expected Stopped status, got {:?}",
3976 event.actor_status
3977 );
3978 assert!(!event.is_error());
3979 }
3980
3981 #[async_timed_test(timeout_secs = 30)]
3982 async fn test_coordinator_shuts_down_last_during_destroy() {
3983 let mut proc = Proc::local();
3984 let (_client, _client_handle) = proc.instance("client").unwrap();
3985 let (mut reported_event, _coordinator_handle) =
3986 ProcSupervisionCoordinator::set(&proc).await.unwrap();
3987
3988 let mut actor_ids = Vec::new();
3990 for i in 0..3 {
3991 let handle = proc
3992 .spawn::<TestActor>(&format!("actor_{i}"), TestActor)
3993 .unwrap();
3994 actor_ids.push(handle.actor_id().clone());
3995 }
3996
3997 proc.destroy_and_wait::<()>(Duration::from_secs(5), None, "test")
4002 .await
4003 .unwrap();
4004
4005 let mut received_ids = Vec::new();
4007 for _ in 0..actor_ids.len() {
4008 let event = reported_event.recv().await;
4009 assert!(
4010 matches!(event.actor_status, ActorStatus::Stopped(_)),
4011 "expected Stopped, got {:?}",
4012 event.actor_status
4013 );
4014 received_ids.push(event.actor_id);
4015 }
4016 received_ids.sort();
4017 actor_ids.sort();
4018 assert_eq!(received_ids, actor_ids);
4019 }
4020
4021 #[async_timed_test(timeout_secs = 30)]
4023 async fn test_supervision_event_on_propagated_failure() {
4024 let proc = Proc::local();
4025 let (client, _client_handle) = proc.instance("client").unwrap();
4026 ProcSupervisionCoordinator::set(&proc).await.unwrap();
4027
4028 let parent = proc.spawn::<TestActor>("parent", TestActor).unwrap();
4029 let parent_cell = parent.cell().clone();
4030 let (tx, rx) = oneshot::channel();
4032 parent.send(&client, TestActorMessage::Spawn(tx)).unwrap();
4033 let child = rx.await.unwrap();
4034 let child_id = child.actor_id().clone();
4035
4036 child
4039 .send(
4040 &client,
4041 TestActorMessage::Fail(anyhow::anyhow!("child boom")),
4042 )
4043 .unwrap();
4044 parent.await;
4045
4046 let event = parent_cell.supervision_event();
4047 assert!(
4048 event.is_some(),
4049 "parent must have supervision_event from propagated failure"
4050 );
4051 let event = event.unwrap();
4052 assert_eq!(event.actually_failing_actor().unwrap().actor_id, child_id);
4054 }
4055
4056 #[async_timed_test(timeout_secs = 30)]
4065 async fn test_resolve_actor_ref_none_for_terminal_actor() {
4066 let proc = Proc::local();
4067 let (_client, _client_handle) = proc.instance("client").unwrap();
4068
4069 let handle = proc.spawn::<TestActor>("target", TestActor).unwrap();
4070 let actor_ref: reference::ActorRef<TestActor> = handle.bind();
4071
4072 assert!(
4074 proc.resolve_actor_ref(&actor_ref).is_some(),
4075 "live actor should be resolvable"
4076 );
4077
4078 handle.drain_and_stop("test").unwrap();
4079 handle.await;
4080
4081 assert!(
4084 proc.resolve_actor_ref(&actor_ref).is_none(),
4085 "terminal actor must not be resolvable"
4086 );
4087 }
4088
4089 #[async_timed_test(timeout_secs = 30)]
4091 async fn test_terminated_snapshot_has_failure_info() {
4092 let proc = Proc::local();
4093 let (client, _client_handle) = proc.instance("client").unwrap();
4094 ProcSupervisionCoordinator::set(&proc).await.unwrap();
4095
4096 let handle = proc.spawn::<TestActor>("fail_actor", TestActor).unwrap();
4097 let actor_id = handle.actor_id().clone();
4098
4099 handle
4100 .send(&client, TestActorMessage::Fail(anyhow::anyhow!("kaboom")))
4101 .unwrap();
4102 handle.await;
4103
4104 let snapshot = wait_for_terminated_snapshot(&proc, &actor_id).await;
4105 let attrs: hyperactor_config::Attrs =
4106 serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
4107 let status = attrs
4108 .get(crate::introspect::STATUS)
4109 .expect("must have status");
4110 assert!(
4111 status.starts_with("failed"),
4112 "expected failed status, got: {}",
4113 status
4114 );
4115 let err_msg = attrs
4116 .get(crate::introspect::FAILURE_ERROR_MESSAGE)
4117 .expect("failed actor must have failure_error_message");
4118 assert!(!err_msg.is_empty());
4119 let root_cause = attrs
4120 .get(crate::introspect::FAILURE_ROOT_CAUSE_ACTOR)
4121 .expect("must have root_cause_actor");
4122 assert_eq!(root_cause, &actor_id);
4123 assert_eq!(
4124 attrs.get(crate::introspect::FAILURE_IS_PROPAGATED),
4125 Some(&false)
4126 );
4127 assert!(
4128 attrs.get(crate::introspect::FAILURE_OCCURRED_AT).is_some(),
4129 "failed actor must have occurred_at"
4130 );
4131 }
4132
4133 #[async_timed_test(timeout_secs = 30)]
4135 async fn test_propagated_failure_info() {
4136 let proc = Proc::local();
4137 let (client, _client_handle) = proc.instance("client").unwrap();
4138 ProcSupervisionCoordinator::set(&proc).await.unwrap();
4139
4140 let parent = proc.spawn::<TestActor>("parent", TestActor).unwrap();
4141 let parent_id = parent.actor_id().clone();
4142
4143 let (tx, rx) = oneshot::channel();
4144 parent.send(&client, TestActorMessage::Spawn(tx)).unwrap();
4145 let child = rx.await.unwrap();
4146 let child_id = child.actor_id().clone();
4147
4148 child
4149 .send(
4150 &client,
4151 TestActorMessage::Fail(anyhow::anyhow!("child fail")),
4152 )
4153 .unwrap();
4154 parent.await;
4155
4156 let snapshot = wait_for_terminated_snapshot(&proc, &parent_id).await;
4157 let attrs: hyperactor_config::Attrs =
4158 serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
4159 let root_cause = attrs
4160 .get(crate::introspect::FAILURE_ROOT_CAUSE_ACTOR)
4161 .expect("propagated failure must have root_cause_actor");
4162 assert_eq!(root_cause, &child_id);
4163 assert_eq!(
4164 attrs.get(crate::introspect::FAILURE_IS_PROPAGATED),
4165 Some(&true)
4166 );
4167 }
4168
4169 #[async_timed_test(timeout_secs = 30)]
4171 async fn test_spawn_with_name_creates_descriptive_name() {
4172 let proc = Proc::local();
4173 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
4174 let handle = proc
4175 .spawn_named_child(root.cell().clone(), "my_controller", TestActor)
4176 .unwrap();
4177 assert_eq!(handle.actor_id().name(), "my_controller");
4178 assert_eq!(handle.actor_id().pid(), 1);
4179 }
4180
4181 #[async_timed_test(timeout_secs = 30)]
4183 async fn test_spawn_with_name_increments_index() {
4184 let proc = Proc::local();
4185 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
4186 let first = proc
4187 .spawn_named_child(root.cell().clone(), "my_controller", TestActor)
4188 .unwrap();
4189 let second = proc
4190 .spawn_named_child(root.cell().clone(), "my_controller", TestActor)
4191 .unwrap();
4192 assert_eq!(first.actor_id().pid(), 1);
4193 assert_eq!(second.actor_id().pid(), 2);
4194 }
4195
4196 #[async_timed_test(timeout_secs = 30)]
4199 async fn test_spawn_with_name_preserves_supervision() {
4200 let proc = Proc::local();
4201 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
4202 let child = proc
4203 .spawn_named_child(root.cell().clone(), "supervised_child", TestActor)
4204 .unwrap();
4205 let child_cell = child.cell();
4206 let parent = child_cell.parent().expect("named child must have a parent");
4207 assert_eq!(parent.actor_id(), root.actor_id());
4208 }
4209
4210 #[async_timed_test(timeout_secs = 30)]
4212 async fn test_spawn_unchanged() {
4213 let proc = Proc::local();
4214 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
4215 let child = proc.spawn_child(root.cell().clone(), TestActor).unwrap();
4216 assert_eq!(child.actor_id().name(), root.actor_id().name());
4217 }
4218
4219 #[async_timed_test(timeout_secs = 30)]
4221 async fn test_spawn_with_name_different_names_different_pids() {
4222 let proc = Proc::local();
4223 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
4224 let a = proc
4225 .spawn_named_child(root.cell().clone(), "controller_a", TestActor)
4226 .unwrap();
4227 let b = proc
4228 .spawn_named_child(root.cell().clone(), "controller_b", TestActor)
4229 .unwrap();
4230 assert_ne!(a.actor_id().pid(), b.actor_id().pid());
4231 assert_eq!(a.actor_id().name(), "controller_a");
4232 assert_eq!(b.actor_id().name(), "controller_b");
4233 }
4234
4235 #[async_timed_test(timeout_secs = 30)]
4237 async fn test_spawn_with_name_no_child_overwrite() {
4238 let proc = Proc::local();
4239 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
4240 let _a = proc
4241 .spawn_named_child(root.cell().clone(), "ctrl", TestActor)
4242 .unwrap();
4243 let _b = proc
4244 .spawn_named_child(root.cell().clone(), "ctrl", TestActor)
4245 .unwrap();
4246 let _c = proc.spawn_child(root.cell().clone(), TestActor).unwrap();
4247 assert_eq!(root.cell().child_count(), 3);
4248 }
4249
4250 #[async_timed_test(timeout_secs = 30)]
4252 async fn test_spawn_with_name_does_not_pollute_roots() {
4253 let proc = Proc::local();
4254 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
4255 let _child = proc
4256 .spawn_named_child(root.cell().clone(), "foo", TestActor)
4257 .unwrap();
4258 let result = proc.spawn::<TestActor>("foo", TestActor);
4261 assert!(result.is_ok(), "named child should not pollute roots");
4262 }
4263
4264 #[async_timed_test(timeout_secs = 30)]
4266 async fn test_ai3_controller_actor_ids_unique_across_parents_same_proc() {
4267 let proc = Proc::local();
4268 let parent_a = proc.spawn::<TestActor>("parent_a", TestActor).unwrap();
4269 let parent_b = proc.spawn::<TestActor>("parent_b", TestActor).unwrap();
4270
4271 let ctrl_a = proc
4273 .spawn_named_child(parent_a.cell().clone(), "controller_mesh_a", TestActor)
4274 .unwrap();
4275 let ctrl_b = proc
4276 .spawn_named_child(parent_b.cell().clone(), "controller_mesh_b", TestActor)
4277 .unwrap();
4278
4279 assert_ne!(
4280 ctrl_a.actor_id(),
4281 ctrl_b.actor_id(),
4282 "controller ActorIds must be unique across parents"
4283 );
4284 }
4285
4286 #[async_timed_test(timeout_secs = 30)]
4288 async fn test_ai3_no_controller_overwrite_in_parent_or_proc_maps() {
4289 let proc = Proc::local();
4290 let parent_a = proc.spawn::<TestActor>("parent_a", TestActor).unwrap();
4291 let parent_b = proc.spawn::<TestActor>("parent_b", TestActor).unwrap();
4292
4293 let ctrl_a = proc
4294 .spawn_named_child(parent_a.cell().clone(), "controller_mesh_a", TestActor)
4295 .unwrap();
4296 let ctrl_b = proc
4297 .spawn_named_child(parent_b.cell().clone(), "controller_mesh_b", TestActor)
4298 .unwrap();
4299
4300 assert!(
4302 proc.get_instance(ctrl_a.actor_id()).is_some(),
4303 "ctrl_a must be resolvable"
4304 );
4305 assert!(
4306 proc.get_instance(ctrl_b.actor_id()).is_some(),
4307 "ctrl_b must be resolvable"
4308 );
4309 assert_eq!(parent_a.cell().child_count(), 1);
4311 assert_eq!(parent_b.cell().child_count(), 1);
4312 }
4313
4314 #[async_timed_test(timeout_secs = 30)]
4316 async fn test_stopped_snapshot_has_no_failure_info() {
4317 let proc = Proc::local();
4318 let (_client, _client_handle) = proc.instance("client").unwrap();
4319
4320 let handle = proc.spawn::<TestActor>("stop_actor", TestActor).unwrap();
4321 let actor_id = handle.actor_id().clone();
4322
4323 handle.drain_and_stop("test").unwrap();
4324 handle.await;
4325
4326 let snapshot = wait_for_terminated_snapshot(&proc, &actor_id).await;
4327 let attrs: hyperactor_config::Attrs =
4328 serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
4329 let status = attrs
4330 .get(crate::introspect::STATUS)
4331 .expect("must have status");
4332 assert!(
4333 status.starts_with("stopped"),
4334 "expected stopped, got: {}",
4335 status
4336 );
4337 assert!(
4338 attrs
4339 .get(crate::introspect::FAILURE_ERROR_MESSAGE)
4340 .is_none(),
4341 "stopped actor must not have failure attrs"
4342 );
4343 }
4344}