1use std::any::Any;
15use std::any::TypeId;
16use std::collections::HashMap;
17use std::fmt;
18use std::future::Future;
19use std::hash::Hash;
20use std::hash::Hasher;
21use std::ops::Deref;
22use std::panic;
23use std::panic::AssertUnwindSafe;
24use std::panic::Location;
25use std::pin::Pin;
26use std::sync::Arc;
27use std::sync::OnceLock;
28use std::sync::Weak;
29use std::sync::atomic::AtomicU64;
30use std::sync::atomic::AtomicUsize;
31use std::sync::atomic::Ordering;
32use std::time::Duration;
33use std::time::SystemTime;
34
35use async_trait::async_trait;
36use dashmap::DashMap;
37use dashmap::mapref::entry::Entry;
38use dashmap::mapref::multiple::RefMulti;
39use futures::FutureExt;
40use hyperactor_macros::AttrValue;
41use hyperactor_macros::Named;
42use hyperactor_telemetry::recorder;
43use hyperactor_telemetry::recorder::Recording;
44use serde::Deserialize;
45use serde::Serialize;
46use tokio::sync::mpsc;
47use tokio::sync::watch;
48use tokio::task::JoinHandle;
49use tracing::Instrument;
50use tracing::Level;
51use tracing::Span;
52use uuid::Uuid;
53
54use crate as hyperactor;
55use crate::Actor;
56use crate::ActorRef;
57use crate::Handler;
58use crate::Message;
59use crate::Named as _;
60use crate::RemoteMessage;
61use crate::actor::ActorError;
62use crate::actor::ActorErrorKind;
63use crate::actor::ActorHandle;
64use crate::actor::ActorStatus;
65use crate::actor::Binds;
66use crate::actor::Referable;
67use crate::actor::RemoteHandles;
68use crate::actor::Signal;
69use crate::attrs::Attrs;
70use crate::channel;
71use crate::channel::ChannelAddr;
72use crate::channel::ChannelError;
73use crate::clock::Clock;
74use crate::clock::ClockKind;
75use crate::clock::RealClock;
76use crate::config;
77use crate::context;
78use crate::data::Serialized;
79use crate::data::TypeInfo;
80use crate::declare_attrs;
81use crate::mailbox::BoxedMailboxSender;
82use crate::mailbox::DeliveryError;
83use crate::mailbox::DialMailboxRouter;
84use crate::mailbox::IntoBoxedMailboxSender as _;
85use crate::mailbox::Mailbox;
86use crate::mailbox::MailboxMuxer;
87use crate::mailbox::MailboxSender;
88use crate::mailbox::MailboxServer as _;
89use crate::mailbox::MessageEnvelope;
90use crate::mailbox::OncePortHandle;
91use crate::mailbox::OncePortReceiver;
92use crate::mailbox::PanickingMailboxSender;
93use crate::mailbox::PortHandle;
94use crate::mailbox::PortReceiver;
95use crate::mailbox::Undeliverable;
96use crate::metrics::ACTOR_MESSAGE_HANDLER_DURATION;
97use crate::metrics::ACTOR_MESSAGE_QUEUE_SIZE;
98use crate::metrics::ACTOR_MESSAGES_RECEIVED;
99use crate::ordering::OrderedSender;
100use crate::ordering::OrderedSenderError;
101use crate::ordering::Sequencer;
102use crate::ordering::ordered_channel;
103use crate::panic_handler;
104use crate::reference::ActorId;
105use crate::reference::Index;
106use crate::reference::PortId;
107use crate::reference::ProcId;
108use crate::reference::id;
109use crate::supervision::ActorSupervisionEvent;
110
111static NEXT_LOCAL_RANK: AtomicUsize = AtomicUsize::new(0);
113
114#[derive(Clone, Debug)]
121pub struct Proc {
122 inner: Arc<ProcState>,
123}
124
125#[derive(Debug)]
126struct ProcState {
127 proc_id: ProcId,
130
131 proc_muxer: MailboxMuxer,
134
135 forwarder: BoxedMailboxSender,
137
138 roots: DashMap<String, AtomicUsize>,
142
143 ledger: ActorLedger,
145
146 instances: DashMap<ActorId, WeakInstanceCell>,
147
148 supervision_coordinator_port: OnceLock<PortHandle<ActorSupervisionEvent>>,
151
152 clock: ClockKind,
153}
154
155impl Drop for ProcState {
156 fn drop(&mut self) {
157 tracing::info!(
161 proc_id = %self.proc_id,
162 name = "ProcStatus",
163 status = "Dropped"
164 );
165 }
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
170pub struct ActorLedgerSnapshot {
171 pub roots: HashMap<ActorId, ActorTreeSnapshot>,
174}
175
176#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
178pub struct Event {
179 pub time: SystemTime,
181 pub fields: Vec<(String, recorder::Value)>,
183 pub seq: usize,
185}
186
187impl From<recorder::Event> for Event {
188 fn from(event: recorder::Event) -> Event {
189 Event {
190 time: event.time,
191 fields: event.fields(),
192 seq: event.seq,
193 }
194 }
195}
196
197#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
199pub struct ActorTreeSnapshot {
200 pub pid: Index,
202
203 pub type_name: String,
207
208 pub status: ActorStatus,
210
211 pub stats: ActorStats,
213
214 pub handlers: HashMap<u64, String>,
216
217 pub children: HashMap<Index, ActorTreeSnapshot>,
219
220 pub events: Vec<Event>,
222
223 pub spans: Vec<Vec<String>>,
226}
227
228impl Hash for ActorTreeSnapshot {
229 fn hash<H: Hasher>(&self, state: &mut H) {
230 self.pid.hash(state);
231 }
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
236#[derive(Default)]
237pub struct ActorStats {
238 num_processed_messages: u64,
240}
241
242impl fmt::Display for ActorStats {
243 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
244 write!(f, "num_processed_messages={}", self.num_processed_messages)
245 }
246}
247
248#[derive(Debug)]
249struct ActorLedger {
250 roots: DashMap<ActorId, WeakInstanceCell>,
252}
253
254impl ActorLedger {
255 fn new() -> Self {
256 Self {
257 roots: DashMap::new(),
258 }
259 }
260
261 fn insert(
262 &self,
263 root_actor_id: ActorId,
264 root_actor_cell: WeakInstanceCell,
265 ) -> Result<(), anyhow::Error> {
266 match self.roots.insert(root_actor_id.clone(), root_actor_cell) {
267 None => Ok(()),
268 Some(current_cell) => {
271 let debugging_msg = match current_cell.upgrade() {
272 Some(cell) => format!("the stored cell's actor ID is {}", cell.actor_id()),
273 None => "the stored cell has been dropped".to_string(),
274 };
275
276 Err(anyhow::anyhow!(
277 "actor '{root_actor_id}' has already been added to ledger: {debugging_msg}"
278 ))
279 }
280 }
281 }
282
283 fn snapshot(&self) -> ActorLedgerSnapshot {
285 let roots = self
286 .roots
287 .iter()
288 .flat_map(|r| {
289 let (actor_id, weak_cell) = r.pair();
290 weak_cell
294 .upgrade()
295 .map(|cell| (actor_id.clone(), Self::get_actor_tree_snapshot(&cell)))
296 })
297 .collect();
298
299 ActorLedgerSnapshot { roots }
300 }
301
302 fn get_actor_tree_snapshot(cell: &InstanceCell) -> ActorTreeSnapshot {
303 let children = cell
305 .child_iter()
306 .map(|child| (child.pid(), Self::get_actor_tree_snapshot(child.value())))
307 .collect();
308
309 ActorTreeSnapshot {
310 pid: cell.actor_id().pid(),
311 type_name: cell.inner.actor_type.type_name().to_string(),
312 status: cell.status().borrow().clone(),
313 stats: ActorStats {
314 num_processed_messages: cell.inner.num_processed_messages.load(Ordering::SeqCst),
315 },
316 handlers: cell
317 .inner
318 .exported_named_ports
319 .iter()
320 .map(|entry| (*entry.key(), entry.value().to_string()))
321 .collect(),
322 children,
323 events: cell
324 .inner
325 .recording
326 .tail()
327 .into_iter()
328 .map(Event::from)
329 .collect(),
330 spans: cell
331 .inner
332 .recording
333 .stacks()
334 .into_iter()
335 .map(|stack| {
336 stack
337 .into_iter()
338 .map(|meta| meta.name().to_string())
339 .collect()
340 })
341 .collect(),
342 }
343 }
344}
345
346impl Proc {
347 pub fn new(proc_id: ProcId, forwarder: BoxedMailboxSender) -> Self {
349 Self::new_with_clock(proc_id, forwarder, ClockKind::default())
350 }
351
352 pub async fn direct(addr: ChannelAddr, name: String) -> Result<Self, ChannelError> {
354 let (addr, rx) = channel::serve(addr)?;
355 let proc_id = ProcId::Direct(addr, name);
356 let proc = Self::new(proc_id, DialMailboxRouter::new().into_boxed());
357 proc.clone().serve(rx);
358 Ok(proc)
359 }
360
361 pub fn direct_with_default(
363 addr: ChannelAddr,
364 name: String,
365 default: BoxedMailboxSender,
366 ) -> Result<Self, ChannelError> {
367 let (addr, rx) = channel::serve(addr)?;
368 let proc_id = ProcId::Direct(addr, name);
369 let proc = Self::new(
370 proc_id,
371 DialMailboxRouter::new_with_default(default).into_boxed(),
372 );
373 proc.clone().serve(rx);
374 Ok(proc)
375 }
376
377 pub fn new_with_clock(
379 proc_id: ProcId,
380 forwarder: BoxedMailboxSender,
381 clock: ClockKind,
382 ) -> Self {
383 tracing::info!(
384 proc_id = %proc_id,
385 name = "ProcStatus",
386 status = "Created"
387 );
388 Self {
389 inner: Arc::new(ProcState {
390 proc_id,
391 proc_muxer: MailboxMuxer::new(),
392 forwarder,
393 roots: DashMap::new(),
394 ledger: ActorLedger::new(),
395 instances: DashMap::new(),
396 supervision_coordinator_port: OnceLock::new(),
397 clock,
398 }),
399 }
400 }
401
402 pub fn set_supervision_coordinator(
405 &self,
406 port: PortHandle<ActorSupervisionEvent>,
407 ) -> Result<(), anyhow::Error> {
408 self.state()
409 .supervision_coordinator_port
410 .set(port)
411 .map_err(|existing| anyhow::anyhow!("coordinator port is already set to {existing}"))
412 }
413
414 fn handle_supervision_event(&self, event: ActorSupervisionEvent) {
415 let result = match self.state().supervision_coordinator_port.get() {
416 Some(port) => port.send(event.clone()).map_err(anyhow::Error::from),
417 None => Err(anyhow::anyhow!(
418 "coordinator port is not set for proc {}",
419 self.proc_id(),
420 )),
421 };
422 if let Err(err) = result {
423 tracing::error!(
424 "proc {}: could not propagate supervision event {} due to error: {:?}: crashing",
425 self.proc_id(),
426 event,
427 err
428 );
429
430 std::process::exit(1);
431 }
432 }
433
434 pub fn local() -> Self {
437 let proc_id = ProcId::Ranked(id!(local), NEXT_LOCAL_RANK.fetch_add(1, Ordering::Relaxed));
440 Proc::new(proc_id, BoxedMailboxSender::new(PanickingMailboxSender))
442 }
443
444 pub fn proc_id(&self) -> &ProcId {
446 &self.state().proc_id
447 }
448
449 pub fn forwarder(&self) -> &BoxedMailboxSender {
452 &self.inner.forwarder
453 }
454
455 fn state(&self) -> &ProcState {
457 self.inner.as_ref()
458 }
459
460 pub fn clock(&self) -> &ClockKind {
462 &self.state().clock
463 }
464
465 pub fn ledger_snapshot(&self) -> ActorLedgerSnapshot {
467 self.state().ledger.snapshot()
468 }
469
470 pub fn attach(&self, name: &str) -> Result<Mailbox, anyhow::Error> {
472 let actor_id: ActorId = self.allocate_root_id(name)?;
473 Ok(self.bind_mailbox(actor_id))
474 }
475
476 pub fn attach_child(&self, parent_id: &ActorId) -> Result<Mailbox, anyhow::Error> {
478 let actor_id: ActorId = self.allocate_child_id(parent_id)?;
479 Ok(self.bind_mailbox(actor_id))
480 }
481
482 fn bind_mailbox(&self, actor_id: ActorId) -> Mailbox {
484 let mbox = Mailbox::new(actor_id, BoxedMailboxSender::new(self.downgrade()));
485
486 self.state().proc_muxer.bind_mailbox(mbox.clone());
489 mbox
490 }
491
492 pub fn attach_actor<R, M>(
495 &self,
496 name: &str,
497 ) -> Result<(Instance<()>, ActorRef<R>, PortReceiver<M>), anyhow::Error>
498 where
499 M: RemoteMessage,
500 R: Referable + RemoteHandles<M>,
501 {
502 let (instance, _handle) = self.instance(name)?;
503 let (_handle, rx) = instance.bind_actor_port::<M>();
504 let actor_ref = ActorRef::attest(instance.self_id().clone());
505 Ok((instance, actor_ref, rx))
506 }
507
508 #[hyperactor::observe_result("Proc")]
511 pub async fn spawn<A: Actor>(
512 &self,
513 name: &str,
514 params: A::Params,
515 ) -> Result<ActorHandle<A>, anyhow::Error> {
516 let actor_id = self.allocate_root_id(name)?;
517 let span = tracing::span!(
518 Level::INFO,
519 "spawn_actor",
520 actor_name = name,
521 actor_type = std::any::type_name::<A>(),
522 actor_id = actor_id.to_string(),
523 );
524 let (instance, mut actor_loop_receivers, work_rx) = {
525 let _guard = span.clone().entered();
526 Instance::new(self.clone(), actor_id.clone(), false, None)
527 };
528 let actor = A::new(params).instrument(span.clone()).await?;
529 self.state()
533 .ledger
534 .insert(actor_id.clone(), instance.inner.cell.downgrade())?;
535
536 Ok(instance
537 .start(actor, actor_loop_receivers.take().unwrap(), work_rx)
538 .instrument(span)
539 .await)
540 }
541
542 pub fn instance(&self, name: &str) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
548 let actor_id = self.allocate_root_id(name)?;
549 let _ = tracing::debug_span!(
550 "actor_instance",
551 actor_name = name,
552 actor_type = std::any::type_name::<()>(),
553 actor_id = actor_id.to_string(),
554 );
555
556 let (instance, _, _) = Instance::new(self.clone(), actor_id.clone(), true, None);
557 let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
558
559 instance.change_status(ActorStatus::Client);
560
561 Ok((instance, handle))
562 }
563
564 fn child_instance(
566 &self,
567 parent: InstanceCell,
568 ) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
569 let actor_id = self.allocate_child_id(parent.actor_id())?;
570 let _ = tracing::debug_span!(
571 "child_actor_instance",
572 parent_actor_id = %parent.actor_id(),
573 actor_type = std::any::type_name::<()>(),
574 actor_id = %actor_id,
575 );
576
577 let (instance, _, _) = Instance::new(self.clone(), actor_id, false, Some(parent));
578 let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
579 instance.change_status(ActorStatus::Client);
580 Ok((instance, handle))
581 }
582
583 async fn spawn_child<A: Actor>(
589 &self,
590 parent: InstanceCell,
591 params: A::Params,
592 ) -> Result<ActorHandle<A>, anyhow::Error> {
593 let actor_id = self.allocate_child_id(parent.actor_id())?;
594 let (instance, mut actor_loop_receivers, work_rx) =
595 Instance::new(self.clone(), actor_id, false, Some(parent.clone()));
596 let actor = A::new(params).await?;
597 Ok(instance
598 .start(actor, actor_loop_receivers.take().unwrap(), work_rx)
599 .await)
600 }
601
602 pub fn abort_root_actor(
606 &self,
607 root: &ActorId,
608 this_handle: Option<&JoinHandle<()>>,
609 ) -> Option<impl Future<Output = ActorId>> {
610 self.state()
611 .ledger
612 .roots
613 .get(root)
614 .into_iter()
615 .flat_map(|e| e.upgrade())
616 .map(|cell| {
617 let r1 = root.clone();
618 let r2 = root.clone();
619 let skip_abort = this_handle.is_some_and(|this_h| {
621 cell.inner
622 .actor_task_handle
623 .get()
624 .is_some_and(|other_h| std::ptr::eq(this_h, other_h))
625 });
626 async move {
632 tokio::task::spawn_blocking(move || {
633 if !skip_abort {
634 let h = cell.inner.actor_task_handle.wait();
635 tracing::debug!("{}: aborting {:?}", r1, h);
636 h.abort();
637 }
638 })
639 .await
640 .unwrap();
641 r2
642 }
643 })
644 .next()
645 }
646
647 pub fn stop_actor(&self, actor_id: &ActorId) -> Option<watch::Receiver<ActorStatus>> {
650 if let Some(entry) = self.state().ledger.roots.get(actor_id) {
651 match entry.value().upgrade() {
652 None => None, Some(cell) => {
654 tracing::info!("sending stop signal to {}", cell.actor_id());
655 if let Err(err) = cell.signal(Signal::DrainAndStop) {
656 tracing::error!(
657 "{}: failed to send stop signal to pid {}: {:?}",
658 self.proc_id(),
659 cell.pid(),
660 err
661 );
662 None
663 } else {
664 Some(cell.status().clone())
665 }
666 }
667 }
668 } else {
669 tracing::error!("no actor {} found in {} roots", actor_id, self.proc_id());
670 None
671 }
672 }
673
674 pub async fn destroy_and_wait<A: Actor>(
682 &mut self,
683 timeout: Duration,
684 cx: Option<&Context<'_, A>>,
685 ) -> Result<(Vec<ActorId>, Vec<ActorId>), anyhow::Error> {
686 self.destroy_and_wait_except_current::<A>(timeout, cx, false)
687 .await
688 }
689
690 #[hyperactor::instrument]
700 pub async fn destroy_and_wait_except_current<A: Actor>(
701 &mut self,
702 timeout: Duration,
703 cx: Option<&Context<'_, A>>,
704 except_current: bool,
705 ) -> Result<(Vec<ActorId>, Vec<ActorId>), anyhow::Error> {
706 tracing::debug!("{}: proc stopping", self.proc_id());
707
708 let (this_handle, this_actor_id) = cx.map_or((None, None), |cx| {
709 (
710 Some(cx.actor_task_handle().expect("cannot call destroy_and_wait from inside an actor unless actor has finished starting")),
711 Some(cx.self_id())
712 )
713 });
714
715 let mut statuses = HashMap::new();
716 for actor_id in self
717 .state()
718 .ledger
719 .roots
720 .iter()
721 .map(|entry| entry.key().clone())
722 .collect::<Vec<_>>()
723 {
724 if let Some(status) = self.stop_actor(&actor_id) {
725 statuses.insert(actor_id, status);
726 }
727 }
728 tracing::debug!("{}: proc stopped", self.proc_id());
729
730 let waits: Vec<_> = statuses
731 .iter_mut()
732 .filter(|(actor_id, _)| Some(*actor_id) != this_actor_id)
733 .map(|(actor_id, root)| {
734 let actor_id = actor_id.clone();
735 async move {
736 RealClock
737 .timeout(
738 timeout,
739 root.wait_for(|state: &ActorStatus| state.is_terminal()),
740 )
741 .await
742 .ok()
743 .map(|_| actor_id)
744 }
745 })
746 .collect();
747
748 let results = futures::future::join_all(waits).await;
749 let stopped_actors: Vec<_> = results
750 .iter()
751 .filter_map(|actor_id| actor_id.as_ref())
752 .cloned()
753 .collect();
754 let aborted_actors: Vec<_> = statuses
755 .iter()
756 .filter(|(actor_id, _)| !stopped_actors.contains(actor_id))
757 .map(|(actor_id, _)| {
758 let f = self.abort_root_actor(actor_id, this_handle);
759 async move {
760 let _ = if let Some(f) = f { Some(f.await) } else { None };
761 actor_id.clone()
768 }
769 })
770 .collect();
771 let aborted_actors = futures::future::join_all(aborted_actors).await;
772
773 if let Some(this_handle) = this_handle
774 && let Some(this_actor_id) = this_actor_id
775 && !except_current
776 {
777 tracing::debug!("{}: aborting (delayed) {:?}", this_actor_id, this_handle);
778 this_handle.abort()
779 };
780
781 tracing::info!(
782 "destroy_and_wait: {} actors stopped, {} actors aborted",
783 stopped_actors.len(),
784 aborted_actors.len()
785 );
786 Ok((stopped_actors, aborted_actors))
787 }
788
789 pub fn resolve_actor_ref<R: Actor + Referable>(
798 &self,
799 actor_ref: &ActorRef<R>,
800 ) -> Option<ActorHandle<R>> {
801 self.inner
802 .instances
803 .get(actor_ref.actor_id())?
804 .upgrade()?
805 .downcast_handle()
806 }
807
808 fn allocate_root_id(&self, name: &str) -> Result<ActorId, anyhow::Error> {
810 let name = name.to_string();
811 match self.state().roots.entry(name.to_string()) {
812 Entry::Vacant(entry) => {
813 entry.insert(AtomicUsize::new(1));
814 }
815 Entry::Occupied(_) => {
816 anyhow::bail!("an actor with name '{}' has already been spawned", name)
817 }
818 }
819 Ok(ActorId(self.state().proc_id.clone(), name.to_string(), 0))
820 }
821
822 #[hyperactor::instrument(fields(actor_name=parent_id.name()))]
824 pub(crate) fn allocate_child_id(&self, parent_id: &ActorId) -> Result<ActorId, anyhow::Error> {
825 assert_eq!(*parent_id.proc_id(), self.state().proc_id);
826 let pid = match self.state().roots.get(parent_id.name()) {
827 None => anyhow::bail!(
828 "no actor named {} in proc {}",
829 parent_id.name(),
830 self.state().proc_id
831 ),
832 Some(next_pid) => next_pid.fetch_add(1, Ordering::Relaxed),
833 };
834 Ok(parent_id.child_id(pid))
835 }
836
837 fn downgrade(&self) -> WeakProc {
838 WeakProc::new(self)
839 }
840}
841
842#[async_trait]
843impl MailboxSender for Proc {
844 fn post_unchecked(
845 &self,
846 envelope: MessageEnvelope,
847 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
848 ) {
849 if envelope.dest().actor_id().proc_id() == &self.state().proc_id {
850 self.state().proc_muxer.post(envelope, return_handle)
851 } else {
852 self.state().forwarder.post(envelope, return_handle)
853 }
854 }
855}
856
857#[derive(Debug)]
858struct WeakProc(Weak<ProcState>);
859
860impl WeakProc {
861 fn new(proc: &Proc) -> Self {
862 Self(Arc::downgrade(&proc.inner))
863 }
864
865 fn upgrade(&self) -> Option<Proc> {
866 self.0.upgrade().map(|inner| Proc { inner })
867 }
868}
869
870#[async_trait]
871impl MailboxSender for WeakProc {
872 fn post_unchecked(
873 &self,
874 envelope: MessageEnvelope,
875 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
876 ) {
877 match self.upgrade() {
878 Some(proc) => proc.post(envelope, return_handle),
879 None => envelope.undeliverable(
880 DeliveryError::BrokenLink("fail to upgrade WeakProc".to_string()),
881 return_handle,
882 ),
883 }
884 }
885}
886
887struct WorkCell<A: Actor + Send>(
890 Box<
891 dyn for<'a> FnOnce(
892 &'a mut A,
893 &'a mut Instance<A>,
894 )
895 -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
896 + Send
897 + Sync,
898 >,
899);
900
901impl<A: Actor + Send> WorkCell<A> {
902 fn new(
904 f: impl for<'a> FnOnce(
905 &'a mut A,
906 &'a mut Instance<A>,
907 )
908 -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
909 + Send
910 + Sync
911 + 'static,
912 ) -> Self {
913 Self(Box::new(f))
914 }
915
916 fn handle<'a>(
918 self,
919 actor: &'a mut A,
920 instance: &'a mut Instance<A>,
921 ) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'a>> {
922 (self.0)(actor, instance)
923 }
924}
925
926pub struct Context<'a, A: Actor> {
928 instance: &'a Instance<A>,
929 headers: Attrs,
930}
931
932impl<'a, A: Actor> Context<'a, A> {
933 pub fn new(instance: &'a Instance<A>, headers: Attrs) -> Self {
935 Self { instance, headers }
936 }
937
938 pub fn headers(&self) -> &Attrs {
940 &self.headers
941 }
942}
943
944impl<A: Actor> Deref for Context<'_, A> {
945 type Target = Instance<A>;
946
947 fn deref(&self) -> &Self::Target {
948 self.instance
949 }
950}
951
952pub struct Instance<A: Actor> {
956 inner: Arc<InstanceState<A>>,
957}
958
959struct InstanceState<A: Actor> {
960 proc: Proc,
962
963 cell: InstanceCell,
965
966 mailbox: Mailbox,
968
969 ports: Arc<Ports<A>>,
970
971 status_tx: watch::Sender<ActorStatus>,
973
974 id: Uuid,
976
977 sequencer: Sequencer,
979}
980
981impl<A: Actor> InstanceState<A> {
982 fn self_id(&self) -> &ActorId {
983 self.mailbox.actor_id()
984 }
985}
986
987impl<A: Actor> Drop for InstanceState<A> {
988 fn drop(&mut self) {
989 self.status_tx.send_if_modified(|status| {
990 if status.is_terminal() {
991 false
992 } else {
993 tracing::info!(
994 name = "ActorStatus",
995 actor_id = %self.self_id(),
996 actor_name = self.self_id().name(),
997 status = "Stopped",
998 prev_status = status.arm().unwrap_or("unknown"),
999 "instance is dropped",
1000 );
1001 *status = ActorStatus::Stopped;
1002 true
1003 }
1004 });
1005 }
1006}
1007
1008impl<A: Actor> Instance<A> {
1009 fn new(
1011 proc: Proc,
1012 actor_id: ActorId,
1013 detached: bool,
1014 parent: Option<InstanceCell>,
1015 ) -> (
1016 Self,
1017 Option<(PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>)>,
1018 mpsc::UnboundedReceiver<WorkCell<A>>,
1019 ) {
1020 let mailbox = Mailbox::new(actor_id.clone(), BoxedMailboxSender::new(proc.downgrade()));
1022 let (work_tx, work_rx) = ordered_channel(
1023 actor_id.to_string(),
1024 config::global::get(config::ENABLE_CLIENT_SEQ_ASSIGNMENT),
1025 );
1026 let ports: Arc<Ports<A>> = Arc::new(Ports::new(mailbox.clone(), work_tx));
1027 proc.state().proc_muxer.bind_mailbox(mailbox.clone());
1028 let (status_tx, status_rx) = watch::channel(ActorStatus::Created);
1029
1030 let actor_type = match TypeInfo::of::<A>() {
1031 Some(info) => ActorType::Named(info),
1032 None => ActorType::Anonymous(std::any::type_name::<A>()),
1033 };
1034 let actor_loop_ports = if detached {
1035 None
1036 } else {
1037 let (signal_port, signal_receiver) = ports.open_message_port().unwrap();
1038 let (supervision_port, supervision_receiver) = mailbox.open_port();
1039 Some((
1040 (signal_port, supervision_port),
1041 (signal_receiver, supervision_receiver),
1042 ))
1043 };
1044
1045 let (actor_loop, actor_loop_receivers) = actor_loop_ports.unzip();
1046
1047 let cell = InstanceCell::new(
1048 actor_id,
1049 actor_type,
1050 proc.clone(),
1051 actor_loop,
1052 status_rx,
1053 parent,
1054 ports.clone(),
1055 );
1056 let instance_id = Uuid::now_v7();
1057 let inner = Arc::new(InstanceState {
1058 proc,
1059 cell,
1060 mailbox,
1061 ports,
1062 status_tx,
1063 sequencer: Sequencer::new(instance_id),
1064 id: instance_id,
1065 });
1066 (Self { inner }, actor_loop_receivers, work_rx)
1067 }
1068
1069 #[track_caller]
1072 fn change_status(&self, new: ActorStatus) {
1073 let old = self.inner.status_tx.send_replace(new.clone());
1074 assert!(
1080 !old.is_terminal() && !new.is_terminal()
1081 || !old.is_terminal() && new.is_terminal()
1082 || old == new,
1083 "actor changing status illegally, only allow non-terminal -> non-terminal \
1084 and non-terminal -> terminal statuses. actor_id={}, prev_status={}, status={}",
1085 self.self_id(),
1086 old,
1087 new
1088 );
1089 if !((old.is_idle() && new.is_processing()) || (old.is_processing() && new.is_idle())) {
1093 let new_status = new.arm().unwrap_or("unknown");
1094 let change_reason = match new {
1095 ActorStatus::Failed(reason) => reason.to_string(),
1096 _ => "".to_string(),
1097 };
1098 tracing::info!(
1099 name = "ActorStatus",
1100 actor_id = %self.self_id(),
1101 actor_name = self.self_id().name(),
1102 status = new_status,
1103 prev_status = old.arm().unwrap_or("unknown"),
1104 caller = %Location::caller(),
1105 change_reason,
1106 );
1107 }
1108 }
1109
1110 fn is_terminal(&self) -> bool {
1111 self.inner.status_tx.borrow().is_terminal()
1112 }
1113
1114 fn is_stopping(&self) -> bool {
1115 self.inner.status_tx.borrow().is_stopping()
1116 }
1117
1118 pub fn self_id(&self) -> &ActorId {
1120 self.inner.self_id()
1121 }
1122
1123 pub fn stop(&self) -> Result<(), ActorError> {
1125 tracing::info!("Instance::stop called, {}", self.inner.cell.actor_id());
1126 self.inner.cell.signal(Signal::DrainAndStop)
1127 }
1128
1129 pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1134 self.inner.mailbox.open_port()
1135 }
1136
1137 pub fn open_once_port<M: Message>(&self) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1141 self.inner.mailbox.open_once_port()
1142 }
1143
1144 pub fn post(&self, port_id: PortId, headers: Attrs, message: Serialized) {
1146 <Self as context::MailboxExt>::post(self, port_id, headers, message, true)
1147 }
1148
1149 pub fn self_message_with_delay<M>(&self, message: M, delay: Duration) -> Result<(), ActorError>
1151 where
1152 M: Message,
1153 A: Handler<M>,
1154 {
1155 let port = self.port();
1156 let self_id = self.self_id().clone();
1157 let clock = self.inner.proc.state().clock.clone();
1158 tokio::spawn(async move {
1159 clock.non_advancing_sleep(delay).await;
1160 if let Err(e) = port.send(message) {
1161 tracing::info!("{}: error sending delayed message: {}", self_id, e);
1164 }
1165 });
1166 Ok(())
1167 }
1168
1169 #[hyperactor::instrument_infallible(fields(actor_id=self.inner.cell.actor_id().clone().to_string(), actor_name=self.inner.cell.actor_id().name()))]
1172 async fn start(
1173 self,
1174 actor: A,
1175 actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1176 work_rx: mpsc::UnboundedReceiver<WorkCell<A>>,
1177 ) -> ActorHandle<A> {
1178 let instance_cell = self.inner.cell.clone();
1179 let actor_id = self.inner.cell.actor_id().clone();
1180 let actor_handle = ActorHandle::new(self.inner.cell.clone(), self.inner.ports.clone());
1181 let actor_task_handle = A::spawn_server_task(
1182 panic_handler::with_backtrace_tracking(self.serve(
1183 actor,
1184 actor_loop_receivers,
1185 work_rx,
1186 ))
1187 .instrument(Span::current()),
1188 );
1189 tracing::debug!("{}: spawned with {:?}", actor_id, actor_task_handle);
1190 instance_cell
1191 .inner
1192 .actor_task_handle
1193 .set(actor_task_handle)
1194 .unwrap_or_else(|_| panic!("{}: task handle store failed", actor_id));
1195
1196 actor_handle
1197 }
1198
1199 async fn serve(
1200 mut self,
1201 mut actor: A,
1202 actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1203 mut work_rx: mpsc::UnboundedReceiver<WorkCell<A>>,
1204 ) {
1205 let result = self
1211 .run_actor_tree(&mut actor, actor_loop_receivers, &mut work_rx)
1212 .await;
1213
1214 assert!(self.is_stopping());
1215 let event = match result {
1216 Ok(_) => {
1217 self.change_status(ActorStatus::Stopped);
1219 None
1220 }
1221 Err(err) => {
1222 match *err.kind {
1223 ActorErrorKind::UnhandledSupervisionEvent(box event) => {
1224 assert!(event.actor_status.is_terminal());
1229 self.change_status(event.actor_status.clone());
1230 Some(event)
1231 }
1232 _ => {
1233 let error_kind = ActorErrorKind::Generic(err.kind.to_string());
1234 let status = ActorStatus::Failed(error_kind);
1235 self.change_status(status.clone());
1236 Some(ActorSupervisionEvent::new(
1237 self.inner.cell.actor_id().clone(),
1238 actor.display_name(),
1239 status,
1240 None,
1241 ))
1242 }
1243 }
1244 }
1245 };
1246
1247 if let Some(parent) = self.inner.cell.maybe_unlink_parent() {
1248 if let Some(event) = event {
1249 parent.send_supervision_event_or_crash(event);
1251 }
1252 if let Err(err) = parent.signal(Signal::ChildStopped(self.inner.cell.pid())) {
1255 tracing::error!(
1256 "{}: failed to send stop message to parent pid {}: {:?}",
1257 self.self_id(),
1258 parent.pid(),
1259 err
1260 );
1261 }
1262 } else {
1263 if let Some(event) = event {
1269 self.inner.proc.handle_supervision_event(event);
1270 }
1271 }
1272 }
1273
1274 async fn run_actor_tree(
1277 &mut self,
1278 actor: &mut A,
1279 mut actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1280 work_rx: &mut mpsc::UnboundedReceiver<WorkCell<A>>,
1281 ) -> Result<(), ActorError> {
1282 let mut did_panic = false;
1288 let result = match AssertUnwindSafe(self.run(actor, &mut actor_loop_receivers, work_rx))
1289 .catch_unwind()
1290 .await
1291 {
1292 Ok(result) => result,
1293 Err(_) => {
1294 did_panic = true;
1295 let panic_info = panic_handler::take_panic_info()
1296 .map(|info| info.to_string())
1297 .unwrap_or_else(|e| format!("Cannot take backtrace due to: {:?}", e));
1298 Err(ActorError::new(
1299 self.self_id(),
1300 ActorErrorKind::panic(anyhow::anyhow!(panic_info)),
1301 ))
1302 }
1303 };
1304
1305 assert!(!self.is_terminal());
1306 self.change_status(ActorStatus::Stopping);
1307 if let Err(err) = &result {
1308 tracing::error!("{}: actor failure: {}", self.self_id(), err);
1309 }
1310
1311 let mut to_unlink = Vec::new();
1314 for child in self.inner.cell.child_iter() {
1315 if let Err(err) = child.value().signal(Signal::Stop) {
1316 tracing::error!(
1317 "{}: failed to send stop signal to child pid {}: {:?}",
1318 self.self_id(),
1319 child.key(),
1320 err
1321 );
1322 to_unlink.push(child.value().clone());
1323 }
1324 }
1325 for child in to_unlink {
1327 self.inner.cell.unlink(&child);
1328 }
1329
1330 let (mut signal_receiver, _) = actor_loop_receivers;
1331 while self.inner.cell.child_count() > 0 {
1332 match RealClock
1333 .timeout(Duration::from_millis(500), signal_receiver.recv())
1334 .await
1335 {
1336 Ok(signal) => {
1337 if let Signal::ChildStopped(pid) = signal? {
1338 assert!(self.inner.cell.get_child(pid).is_none());
1339 }
1340 }
1341 Err(_) => {
1342 tracing::warn!(
1343 "timeout waiting for ChildStopped signal from child on actor: {}, ignoring",
1344 self.self_id()
1345 );
1346 self.inner.cell.unlink_all();
1349 break;
1350 }
1351 }
1352 }
1353 let cleanup_result = if !did_panic {
1359 let cleanup_timeout = config::global::get(config::CLEANUP_TIMEOUT);
1360 match RealClock
1361 .timeout(cleanup_timeout, actor.cleanup(self, result.as_ref().err()))
1362 .await
1363 {
1364 Ok(Ok(x)) => Ok(x),
1365 Ok(Err(e)) => Err(ActorError::new(self.self_id(), ActorErrorKind::cleanup(e))),
1366 Err(e) => Err(ActorError::new(
1367 self.self_id(),
1368 ActorErrorKind::cleanup(e.into()),
1369 )),
1370 }
1371 } else {
1372 Ok(())
1373 };
1374 if let Err(ref actor_err) = result {
1375 if let Err(ref err) = cleanup_result {
1378 tracing::warn!(
1379 cleanup_err = %err,
1380 %actor_err,
1381 "ignoring cleanup error after actor error",
1382 );
1383 }
1384 }
1385 result.and(cleanup_result)
1388 }
1389
1390 async fn run(
1392 &mut self,
1393 actor: &mut A,
1394 actor_loop_receivers: &mut (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1395 work_rx: &mut mpsc::UnboundedReceiver<WorkCell<A>>,
1396 ) -> Result<(), ActorError> {
1397 let (signal_receiver, supervision_event_receiver) = actor_loop_receivers;
1398
1399 self.change_status(ActorStatus::Initializing);
1400 actor
1401 .init(self)
1402 .await
1403 .map_err(|err| ActorError::new(self.self_id(), ActorErrorKind::init(err)))?;
1404 let need_drain;
1405 'messages: loop {
1406 self.change_status(ActorStatus::Idle);
1407 let metric_pairs =
1408 hyperactor_telemetry::kv_pairs!("actor_id" => self.self_id().to_string());
1409 tokio::select! {
1410 work = work_rx.recv() => {
1411 ACTOR_MESSAGES_RECEIVED.add(1, metric_pairs);
1412 ACTOR_MESSAGE_QUEUE_SIZE.add(-1, metric_pairs);
1413 let _ = ACTOR_MESSAGE_HANDLER_DURATION.start(metric_pairs);
1414 let work = work.expect("inconsistent work queue state");
1415 if let Err(err) = work.handle(actor, self).await {
1416 for supervision_event in supervision_event_receiver.drain() {
1417 self.handle_supervision_event(actor, supervision_event).await?;
1418 }
1419 let kind = ActorErrorKind::processing(err);
1420 return Err(ActorError {
1421 actor_id: Box::new(self.self_id().clone()),
1422 kind: Box::new(kind),
1423 });
1424 }
1425 }
1426 signal = signal_receiver.recv() => {
1427 let signal = signal.map_err(ActorError::from);
1428 tracing::debug!("Received signal {signal:?}");
1429 match signal? {
1430 signal@(Signal::Stop | Signal::DrainAndStop) => {
1431 need_drain = matches!(signal, Signal::DrainAndStop);
1432 break 'messages;
1433 },
1434 Signal::ChildStopped(pid) => {
1435 assert!(self.inner.cell.get_child(pid).is_none());
1436 },
1437 }
1438 }
1439 Ok(supervision_event) = supervision_event_receiver.recv() => {
1440 self.handle_supervision_event(actor, supervision_event).await?;
1441 }
1442 }
1443 self.inner
1444 .cell
1445 .inner
1446 .num_processed_messages
1447 .fetch_add(1, Ordering::SeqCst);
1448 }
1449
1450 if need_drain {
1451 let mut n = 0;
1452 while let Ok(work) = work_rx.try_recv() {
1453 if let Err(err) = work.handle(actor, self).await {
1454 return Err(ActorError::new(
1455 self.self_id(),
1456 ActorErrorKind::processing(err),
1457 ));
1458 }
1459 n += 1;
1460 }
1461 tracing::debug!("drained {} messages", n);
1462 }
1463 tracing::debug!("exited actor loop: {}", self.self_id());
1464 Ok(())
1465 }
1466
1467 async fn handle_supervision_event(
1468 &self,
1469 actor: &mut A,
1470 supervision_event: ActorSupervisionEvent,
1471 ) -> Result<(), ActorError> {
1472 match actor
1474 .handle_supervision_event(self, &supervision_event)
1475 .await
1476 {
1477 Ok(true) => {
1478 Ok(())
1480 }
1481 Ok(false) => {
1482 let kind = ActorErrorKind::UnhandledSupervisionEvent(Box::new(supervision_event));
1483 Err(ActorError::new(self.self_id(), kind))
1484 }
1485 Err(err) => {
1486 let kind = ActorErrorKind::ErrorDuringHandlingSupervision(
1489 err.to_string(),
1490 Box::new(supervision_event),
1491 );
1492 Err(ActorError::new(self.self_id(), kind))
1493 }
1494 }
1495 }
1496
1497 #[hyperactor::instrument(fields(actor_id = self.self_id().to_string(), actor_name = self.self_id().name()))]
1498 async unsafe fn handle_message<M: Message>(
1499 &mut self,
1500 actor: &mut A,
1501 type_info: Option<&'static TypeInfo>,
1502 headers: Attrs,
1503 message: M,
1504 ) -> Result<(), anyhow::Error>
1505 where
1506 A: Handler<M>,
1507 {
1508 let handler = type_info.map(|info| {
1509 (
1510 info.typename().to_string(),
1511 unsafe {
1513 info.arm_unchecked(&message as *const M as *const ())
1514 .map(str::to_string)
1515 },
1516 )
1517 });
1518
1519 self.change_status(ActorStatus::Processing(
1520 self.clock().system_time_now(),
1521 handler,
1522 ));
1523 crate::mailbox::headers::log_message_latency_if_sampling(
1524 &headers,
1525 self.self_id().to_string(),
1526 );
1527
1528 let context = Context::new(self, headers);
1529 actor.handle(&context, message).await
1533 }
1534
1535 pub(crate) async fn spawn<C: Actor>(
1537 &self,
1538 params: C::Params,
1539 ) -> anyhow::Result<ActorHandle<C>> {
1540 self.inner
1541 .proc
1542 .spawn_child(self.inner.cell.clone(), params)
1543 .await
1544 }
1545
1546 pub fn child(&self) -> anyhow::Result<(Instance<()>, ActorHandle<()>)> {
1548 self.inner.proc.child_instance(self.inner.cell.clone())
1549 }
1550
1551 pub fn port<M: Message>(&self) -> PortHandle<M>
1554 where
1555 A: Handler<M>,
1556 {
1557 self.inner.ports.get()
1558 }
1559
1560 pub fn handle(&self) -> ActorHandle<A> {
1562 ActorHandle::new(self.inner.cell.clone(), Arc::clone(&self.inner.ports))
1563 }
1564
1565 pub fn bind<R: Binds<A>>(&self) -> ActorRef<R> {
1567 self.inner.cell.bind(self.inner.ports.as_ref())
1568 }
1569
1570 #[doc(hidden)]
1572 pub fn mailbox_for_py(&self) -> &Mailbox {
1573 &self.inner.mailbox
1574 }
1575
1576 pub fn clock(&self) -> &(impl Clock + use<A>) {
1578 &self.inner.proc.state().clock
1579 }
1580
1581 pub fn proc(&self) -> &Proc {
1583 &self.inner.proc
1584 }
1585
1586 #[doc(hidden)]
1590 pub fn clone_for_py(&self) -> Self {
1591 Self {
1592 inner: Arc::clone(&self.inner),
1593 }
1594 }
1595
1596 fn actor_task_handle(&self) -> Option<&JoinHandle<()>> {
1598 self.inner.cell.inner.actor_task_handle.get()
1599 }
1600
1601 pub fn sequencer(&self) -> &Sequencer {
1603 &self.inner.sequencer
1604 }
1605
1606 pub fn instance_id(&self) -> Uuid {
1608 self.inner.id
1609 }
1610}
1611
1612impl<A: Actor> context::Mailbox for Instance<A> {
1613 fn mailbox(&self) -> &Mailbox {
1614 &self.inner.mailbox
1615 }
1616}
1617
1618impl<A: Actor> context::Mailbox for Context<'_, A> {
1619 fn mailbox(&self) -> &Mailbox {
1620 &self.instance.inner.mailbox
1621 }
1622}
1623
1624impl<A: Actor> context::Mailbox for &Instance<A> {
1625 fn mailbox(&self) -> &Mailbox {
1626 &self.inner.mailbox
1627 }
1628}
1629
1630impl<A: Actor> context::Mailbox for &Context<'_, A> {
1631 fn mailbox(&self) -> &Mailbox {
1632 &self.instance.inner.mailbox
1633 }
1634}
1635
1636impl<A: Actor> context::Actor for Instance<A> {
1637 type A = A;
1638 fn instance(&self) -> &Instance<A> {
1639 self
1640 }
1641}
1642
1643impl<A: Actor> context::Actor for Context<'_, A> {
1644 type A = A;
1645 fn instance(&self) -> &Instance<A> {
1646 self
1647 }
1648}
1649
1650impl<A: Actor> context::Actor for &Instance<A> {
1651 type A = A;
1652 fn instance(&self) -> &Instance<A> {
1653 self
1654 }
1655}
1656
1657impl<A: Actor> context::Actor for &Context<'_, A> {
1658 type A = A;
1659 fn instance(&self) -> &Instance<A> {
1660 self
1661 }
1662}
1663
1664impl Instance<()> {
1665 pub fn bind_actor_port<M: RemoteMessage>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1667 assert!(
1668 self.actor_task_handle().is_none(),
1669 "can only bind actor port on instance with no running actor task"
1670 );
1671 self.inner.mailbox.bind_actor_port()
1672 }
1673}
1674
1675#[derive(Debug)]
1676enum ActorType {
1677 Named(&'static TypeInfo),
1678 Anonymous(&'static str),
1679}
1680
1681impl ActorType {
1682 fn type_name(&self) -> &'static str {
1684 match self {
1685 Self::Named(info) => info.typename(),
1686 Self::Anonymous(name) => name,
1687 }
1688 }
1689}
1690
1691#[derive(Clone, Debug)]
1697pub struct InstanceCell {
1698 inner: Arc<InstanceCellState>,
1699}
1700
1701#[derive(Debug)]
1702struct InstanceCellState {
1703 actor_id: ActorId,
1705
1706 actor_type: ActorType,
1708
1709 proc: Proc,
1711
1712 actor_loop: Option<(PortHandle<Signal>, PortHandle<ActorSupervisionEvent>)>,
1714
1715 status: watch::Receiver<ActorStatus>,
1717
1718 parent: WeakInstanceCell,
1720
1721 children: DashMap<Index, InstanceCell>,
1723
1724 actor_task_handle: OnceLock<JoinHandle<()>>,
1726
1727 exported_named_ports: DashMap<u64, &'static str>,
1729
1730 num_processed_messages: AtomicU64,
1732
1733 recording: Recording,
1736
1737 ports: Arc<dyn Any + Send + Sync>,
1740}
1741
1742impl InstanceCellState {
1743 fn maybe_unlink_parent(&self) -> Option<InstanceCell> {
1746 self.parent
1747 .upgrade()
1748 .filter(|parent| parent.inner.unlink(self))
1749 }
1750
1751 fn unlink(&self, child: &InstanceCellState) -> bool {
1753 assert_eq!(self.actor_id.proc_id(), child.actor_id.proc_id());
1754 self.children.remove(&child.actor_id.pid()).is_some()
1755 }
1756}
1757
1758impl InstanceCell {
1759 fn new(
1762 actor_id: ActorId,
1763 actor_type: ActorType,
1764 proc: Proc,
1765 actor_loop: Option<(PortHandle<Signal>, PortHandle<ActorSupervisionEvent>)>,
1766 status: watch::Receiver<ActorStatus>,
1767 parent: Option<InstanceCell>,
1768 ports: Arc<dyn Any + Send + Sync>,
1769 ) -> Self {
1770 let _ais = actor_id.to_string();
1771 let cell = Self {
1772 inner: Arc::new(InstanceCellState {
1773 actor_id: actor_id.clone(),
1774 actor_type,
1775 proc: proc.clone(),
1776 actor_loop,
1777 status,
1778 parent: parent.map_or_else(WeakInstanceCell::new, |cell| cell.downgrade()),
1779 children: DashMap::new(),
1780 actor_task_handle: OnceLock::new(),
1781 exported_named_ports: DashMap::new(),
1782 num_processed_messages: AtomicU64::new(0),
1783 recording: hyperactor_telemetry::recorder().record(64),
1784 ports,
1785 }),
1786 };
1787 cell.maybe_link_parent();
1788 proc.inner
1789 .instances
1790 .insert(actor_id.clone(), cell.downgrade());
1791 cell
1792 }
1793
1794 fn wrap(inner: Arc<InstanceCellState>) -> Self {
1795 Self { inner }
1796 }
1797
1798 pub(crate) fn actor_id(&self) -> &ActorId {
1800 &self.inner.actor_id
1801 }
1802
1803 pub(crate) fn pid(&self) -> Index {
1805 self.inner.actor_id.pid()
1806 }
1807
1808 #[allow(dead_code)]
1810 pub(crate) fn actor_task_handle(&self) -> Option<&JoinHandle<()>> {
1811 self.inner.actor_task_handle.get()
1812 }
1813
1814 pub(crate) fn status(&self) -> &watch::Receiver<ActorStatus> {
1816 &self.inner.status
1817 }
1818
1819 pub fn signal(&self, signal: Signal) -> Result<(), ActorError> {
1821 if let Some((signal_port, _)) = &self.inner.actor_loop {
1822 signal_port.send(signal).map_err(ActorError::from)
1823 } else {
1824 tracing::warn!(
1825 "{}: attempted to send signal {} to detached actor",
1826 self.inner.actor_id,
1827 signal
1828 );
1829 Ok(())
1830 }
1831 }
1832
1833 pub fn send_supervision_event_or_crash(&self, event: ActorSupervisionEvent) {
1842 match &self.inner.actor_loop {
1843 Some((_, supervision_port)) => {
1844 if let Err(err) = supervision_port.send(event) {
1845 tracing::error!(
1846 "{}: failed to send supervision event to actor: {:?}. Crash the process.",
1847 self.actor_id(),
1848 err
1849 );
1850 std::process::exit(1);
1851 }
1852 }
1853 None => {
1854 tracing::error!(
1855 "{}: failed: {}: cannot send supervision event to detached actor: crashing",
1856 self.actor_id(),
1857 event,
1858 );
1859 std::process::exit(1);
1860 }
1861 }
1862 }
1863
1864 pub fn downgrade(&self) -> WeakInstanceCell {
1866 WeakInstanceCell {
1867 inner: Arc::downgrade(&self.inner),
1868 }
1869 }
1870
1871 fn link(&self, child: InstanceCell) {
1873 assert_eq!(self.actor_id().proc_id(), child.actor_id().proc_id());
1874 self.inner.children.insert(child.pid(), child);
1875 }
1876
1877 fn unlink(&self, child: &InstanceCell) {
1879 assert_eq!(self.actor_id().proc_id(), child.actor_id().proc_id());
1880 self.inner.children.remove(&child.pid());
1881 }
1882
1883 fn unlink_all(&self) {
1885 self.inner.children.clear();
1886 }
1887
1888 fn maybe_link_parent(&self) {
1890 if let Some(parent) = self.inner.parent.upgrade() {
1891 parent.link(self.clone());
1892 }
1893 }
1894
1895 fn maybe_unlink_parent(&self) -> Option<InstanceCell> {
1898 self.inner.maybe_unlink_parent()
1899 }
1900
1901 #[allow(dead_code)]
1903 fn get_parent_cell(&self) -> Option<InstanceCell> {
1904 self.inner.parent.upgrade()
1905 }
1906
1907 fn child_iter(&self) -> impl Iterator<Item = RefMulti<'_, Index, InstanceCell>> {
1910 self.inner.children.iter()
1911 }
1912
1913 fn child_count(&self) -> usize {
1915 self.inner.children.len()
1916 }
1917
1918 fn get_child(&self, pid: Index) -> Option<InstanceCell> {
1920 self.inner.children.get(&pid).map(|child| child.clone())
1921 }
1922
1923 pub(crate) fn bind<A: Actor, R: Binds<A>>(&self, ports: &Ports<A>) -> ActorRef<R> {
1926 <R as Binds<A>>::bind(ports);
1927 ports.bind::<Signal>();
1929 ports.bind::<Undeliverable<MessageEnvelope>>();
1930 for entry in ports.bound.iter() {
1932 self.inner
1933 .exported_named_ports
1934 .insert(*entry.key(), entry.value());
1935 }
1936 ActorRef::attest(self.actor_id().clone())
1937 }
1938
1939 pub(crate) fn downcast_handle<A: Actor>(&self) -> Option<ActorHandle<A>> {
1941 let ports = Arc::clone(&self.inner.ports).downcast::<Ports<A>>().ok()?;
1942 Some(ActorHandle::new(self.clone(), ports))
1943 }
1944}
1945
1946impl Drop for InstanceCellState {
1947 fn drop(&mut self) {
1948 if let Some(parent) = self.maybe_unlink_parent() {
1949 tracing::debug!(
1950 "instance {} was dropped with parent {} still linked",
1951 self.actor_id,
1952 parent.actor_id()
1953 );
1954 }
1955 if self.proc.inner.instances.remove(&self.actor_id).is_none() {
1956 tracing::error!("instance {} was dropped but not in proc", self.actor_id);
1957 }
1958 }
1959}
1960
1961#[derive(Debug, Clone)]
1964pub struct WeakInstanceCell {
1965 inner: Weak<InstanceCellState>,
1966}
1967
1968impl Default for WeakInstanceCell {
1969 fn default() -> Self {
1970 Self::new()
1971 }
1972}
1973
1974impl WeakInstanceCell {
1975 pub fn new() -> Self {
1977 Self { inner: Weak::new() }
1978 }
1979
1980 pub fn upgrade(&self) -> Option<InstanceCell> {
1982 self.inner.upgrade().map(InstanceCell::wrap)
1983 }
1984}
1985
1986pub struct Ports<A: Actor> {
1991 ports: DashMap<TypeId, Box<dyn Any + Send + Sync + 'static>>,
1992 bound: DashMap<u64, &'static str>,
1993 mailbox: Mailbox,
1994 workq: OrderedSender<WorkCell<A>>,
1995}
1996
1997#[derive(Serialize, Deserialize, Clone, Named, AttrValue)]
1999pub struct SeqInfo {
2000 pub session_id: Uuid,
2002 pub seq: u64,
2004}
2005
2006impl fmt::Display for SeqInfo {
2007 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2008 write!(f, "{}:{}", self.session_id, self.seq)
2009 }
2010}
2011
2012impl std::str::FromStr for SeqInfo {
2013 type Err = anyhow::Error;
2014
2015 fn from_str(s: &str) -> Result<Self, Self::Err> {
2016 let parts: Vec<_> = s.split(':').collect();
2017 if parts.len() != 2 {
2018 return Err(anyhow::anyhow!("invalid SeqInfo: {}", s));
2019 }
2020 let session_id: Uuid = parts[0].parse()?;
2021 let seq: u64 = parts[1].parse()?;
2022 Ok(SeqInfo { session_id, seq })
2023 }
2024}
2025
2026declare_attrs! {
2027 pub attr SEQ_INFO: SeqInfo;
2030}
2031
2032impl<A: Actor> Ports<A> {
2033 fn new(mailbox: Mailbox, workq: OrderedSender<WorkCell<A>>) -> Self {
2034 Self {
2035 ports: DashMap::new(),
2036 bound: DashMap::new(),
2037 mailbox,
2038 workq,
2039 }
2040 }
2041
2042 pub(crate) fn get<M: Message>(&self) -> PortHandle<M>
2044 where
2045 A: Handler<M>,
2046 {
2047 let key = TypeId::of::<M>();
2048 match self.ports.entry(key) {
2049 Entry::Vacant(entry) => {
2050 assert_ne!(
2052 key,
2053 TypeId::of::<Signal>(),
2054 "cannot provision Signal port through `Ports::get`"
2055 );
2056
2057 let type_info = TypeInfo::get_by_typeid(key);
2058 let workq = self.workq.clone();
2059 let actor_id = self.mailbox.actor_id().to_string();
2060 let port = self.mailbox.open_enqueue_port(move |headers, msg: M| {
2061 let seq_info = headers.get(SEQ_INFO).cloned();
2062
2063 let work = WorkCell::new(move |actor: &mut A, instance: &mut Instance<A>| {
2064 Box::pin(async move {
2065 unsafe {
2067 instance
2068 .handle_message(actor, type_info, headers, msg)
2069 .await
2070 }
2071 })
2072 });
2073 ACTOR_MESSAGE_QUEUE_SIZE.add(
2074 1,
2075 hyperactor_telemetry::kv_pairs!("actor_id" => actor_id.clone()),
2076 );
2077 if workq.enable_buffering {
2078 let SeqInfo { session_id, seq } =
2079 seq_info.expect("SEQ_INFO must be set when buffering is enabled");
2080
2081 workq.send(session_id, seq, work).map_err(|e| match e {
2084 OrderedSenderError::InvalidZeroSeq(_) => {
2085 anyhow::anyhow!("seq must be greater than 0")
2086 }
2087 OrderedSenderError::SendError(e) => anyhow::Error::from(e),
2088 OrderedSenderError::FlushError(e) => e,
2089 })
2090 } else {
2091 workq.direct_send(work).map_err(anyhow::Error::from)
2092 }
2093 });
2094 entry.insert(Box::new(port.clone()));
2095 port
2096 }
2097 Entry::Occupied(entry) => {
2098 let port = entry.get();
2099 port.downcast_ref::<PortHandle<M>>().unwrap().clone()
2100 }
2101 }
2102 }
2103
2104 pub(crate) fn open_message_port<M: Message>(&self) -> Option<(PortHandle<M>, PortReceiver<M>)> {
2107 match self.ports.entry(TypeId::of::<M>()) {
2108 Entry::Vacant(entry) => {
2109 let (port, receiver) = self.mailbox.open_port();
2110 entry.insert(Box::new(port.clone()));
2111 Some((port, receiver))
2112 }
2113 Entry::Occupied(_) => None,
2114 }
2115 }
2116
2117 pub fn bind<M: RemoteMessage>(&self)
2119 where
2120 A: Handler<M>,
2121 {
2122 let port_index = M::port();
2123 match self.bound.entry(port_index) {
2124 Entry::Vacant(entry) => {
2125 self.get::<M>().bind_actor_port();
2126 entry.insert(M::typename());
2127 }
2128 Entry::Occupied(entry) => {
2129 assert_eq!(
2130 *entry.get(),
2131 M::typename(),
2132 "bind {}: port index {} already bound to type {}",
2133 M::typename(),
2134 port_index,
2135 entry.get(),
2136 );
2137 }
2138 }
2139 }
2140}
2141
2142#[cfg(test)]
2143mod tests {
2144 use std::assert_matches::assert_matches;
2145 use std::sync::atomic::AtomicBool;
2146
2147 use hyperactor_macros::export;
2148 use maplit::hashmap;
2149 use serde_json::json;
2150 use timed_test::async_timed_test;
2151 use tokio::sync::Barrier;
2152 use tokio::sync::oneshot;
2153 use tracing::Level;
2154 use tracing_subscriber::layer::SubscriberExt;
2155 use tracing_test::internal::logs_with_scope_contain;
2156
2157 use super::*;
2158 use crate as hyperactor;
2160 use crate::HandleClient;
2161 use crate::Handler;
2162 use crate::OncePortRef;
2163 use crate::PortRef;
2164 use crate::clock::RealClock;
2165 use crate::test_utils::proc_supervison::ProcSupervisionCoordinator;
2166 use crate::test_utils::process_assertion::assert_termination;
2167
2168 impl ActorTreeSnapshot {
2169 #[allow(dead_code)]
2170 fn empty(pid: Index) -> Self {
2171 Self {
2172 pid,
2173 type_name: String::new(),
2174 status: ActorStatus::Idle,
2175 stats: ActorStats::default(),
2176 handlers: HashMap::new(),
2177 children: HashMap::new(),
2178 events: Vec::new(),
2179 spans: Vec::new(),
2180 }
2181 }
2182
2183 fn empty_typed(pid: Index, type_name: String) -> Self {
2184 Self {
2185 pid,
2186 type_name,
2187 status: ActorStatus::Idle,
2188 stats: ActorStats::default(),
2189 handlers: HashMap::new(),
2190 children: HashMap::new(),
2191 events: Vec::new(),
2192 spans: Vec::new(),
2193 }
2194 }
2195 }
2196
2197 #[derive(Debug, Default, Actor)]
2198 #[export]
2199 struct TestActor;
2200
2201 #[derive(Handler, HandleClient, Debug)]
2202 enum TestActorMessage {
2203 Reply(oneshot::Sender<()>),
2204 Wait(oneshot::Sender<()>, oneshot::Receiver<()>),
2205 Forward(ActorHandle<TestActor>, Box<TestActorMessage>),
2206 Noop(),
2207 Fail(anyhow::Error),
2208 Panic(String),
2209 Spawn(oneshot::Sender<ActorHandle<TestActor>>),
2210 }
2211
2212 impl TestActor {
2213 async fn spawn_child(parent: &ActorHandle<TestActor>) -> ActorHandle<TestActor> {
2214 let (tx, rx) = oneshot::channel();
2215 parent.send(TestActorMessage::Spawn(tx)).unwrap();
2216 rx.await.unwrap()
2217 }
2218 }
2219
2220 #[async_trait]
2221 #[crate::forward(TestActorMessage)]
2222 impl TestActorMessageHandler for TestActor {
2223 async fn reply(
2224 &mut self,
2225 _cx: &crate::Context<Self>,
2226 sender: oneshot::Sender<()>,
2227 ) -> Result<(), anyhow::Error> {
2228 sender.send(()).unwrap();
2229 Ok(())
2230 }
2231
2232 async fn wait(
2233 &mut self,
2234 _cx: &crate::Context<Self>,
2235 sender: oneshot::Sender<()>,
2236 receiver: oneshot::Receiver<()>,
2237 ) -> Result<(), anyhow::Error> {
2238 sender.send(()).unwrap();
2239 receiver.await.unwrap();
2240 Ok(())
2241 }
2242
2243 async fn forward(
2244 &mut self,
2245 _cx: &crate::Context<Self>,
2246 destination: ActorHandle<TestActor>,
2247 message: Box<TestActorMessage>,
2248 ) -> Result<(), anyhow::Error> {
2249 destination.send(*message)?;
2251 Ok(())
2252 }
2253
2254 async fn noop(&mut self, _cx: &crate::Context<Self>) -> Result<(), anyhow::Error> {
2255 Ok(())
2256 }
2257
2258 async fn fail(
2259 &mut self,
2260 _cx: &crate::Context<Self>,
2261 err: anyhow::Error,
2262 ) -> Result<(), anyhow::Error> {
2263 Err(err)
2264 }
2265
2266 async fn panic(
2267 &mut self,
2268 _cx: &crate::Context<Self>,
2269 err_msg: String,
2270 ) -> Result<(), anyhow::Error> {
2271 panic!("{}", err_msg);
2272 }
2273
2274 async fn spawn(
2275 &mut self,
2276 cx: &crate::Context<Self>,
2277 reply: oneshot::Sender<ActorHandle<TestActor>>,
2278 ) -> Result<(), anyhow::Error> {
2279 let handle = <Self as Actor>::spawn(cx, ()).await?;
2280 reply.send(handle).unwrap();
2281 Ok(())
2282 }
2283 }
2284
2285 #[tracing_test::traced_test]
2286 #[async_timed_test(timeout_secs = 30)]
2287 async fn test_spawn_actor() {
2288 let proc = Proc::local();
2289 let handle = proc.spawn::<TestActor>("test", ()).await.unwrap();
2290
2291 assert!(logs_contain(
2293 format!(
2294 "{}: spawned with {:?}",
2295 handle.actor_id(),
2296 handle.cell().actor_task_handle().unwrap(),
2297 )
2298 .as_str()
2299 ));
2300
2301 let mut state = handle.status().clone();
2302
2303 let (tx, rx) = oneshot::channel::<()>();
2306 handle.send(TestActorMessage::Reply(tx)).unwrap();
2307 rx.await.unwrap();
2308
2309 state
2310 .wait_for(|state: &ActorStatus| matches!(*state, ActorStatus::Idle))
2311 .await
2312 .unwrap();
2313
2314 let (enter_tx, enter_rx) = oneshot::channel::<()>();
2316 let (exit_tx, exit_rx) = oneshot::channel::<()>();
2317
2318 handle
2319 .send(TestActorMessage::Wait(enter_tx, exit_rx))
2320 .unwrap();
2321 enter_rx.await.unwrap();
2322 assert_matches!(*state.borrow(), ActorStatus::Processing(instant, _) if instant <= RealClock.system_time_now());
2323 exit_tx.send(()).unwrap();
2324
2325 state
2326 .wait_for(|state| matches!(*state, ActorStatus::Idle))
2327 .await
2328 .unwrap();
2329
2330 handle.drain_and_stop().unwrap();
2331 handle.await;
2332 assert_matches!(*state.borrow(), ActorStatus::Stopped);
2333 }
2334
2335 #[async_timed_test(timeout_secs = 30)]
2336 async fn test_proc_actors_messaging() {
2337 let proc = Proc::local();
2338 let first = proc.spawn::<TestActor>("first", ()).await.unwrap();
2339 let second = proc.spawn::<TestActor>("second", ()).await.unwrap();
2340 let (tx, rx) = oneshot::channel::<()>();
2341 let reply_message = TestActorMessage::Reply(tx);
2342 first
2343 .send(TestActorMessage::Forward(second, Box::new(reply_message)))
2344 .unwrap();
2345 rx.await.unwrap();
2346 }
2347
2348 #[derive(Debug, Default, Actor)]
2349 struct LookupTestActor;
2350
2351 #[derive(Handler, HandleClient, Debug)]
2352 enum LookupTestMessage {
2353 ActorExists(ActorRef<TestActor>, #[reply] OncePortRef<bool>),
2354 }
2355
2356 #[async_trait]
2357 #[crate::forward(LookupTestMessage)]
2358 impl LookupTestMessageHandler for LookupTestActor {
2359 async fn actor_exists(
2360 &mut self,
2361 cx: &crate::Context<Self>,
2362 actor_ref: ActorRef<TestActor>,
2363 ) -> Result<bool, anyhow::Error> {
2364 Ok(actor_ref.downcast_handle(cx).is_some())
2365 }
2366 }
2367
2368 #[async_timed_test(timeout_secs = 30)]
2369 async fn test_actor_lookup() {
2370 let proc = Proc::local();
2371 let (client, _handle) = proc.instance("client").unwrap();
2372
2373 let target_actor = proc.spawn::<TestActor>("target", ()).await.unwrap();
2374 let target_actor_ref = target_actor.bind();
2375 let lookup_actor = proc.spawn::<LookupTestActor>("lookup", ()).await.unwrap();
2376
2377 assert!(
2378 lookup_actor
2379 .actor_exists(&client, target_actor_ref.clone())
2380 .await
2381 .unwrap()
2382 );
2383
2384 assert!(
2386 !lookup_actor
2387 .actor_exists(
2388 &client,
2389 ActorRef::attest(target_actor.actor_id().child_id(123).clone())
2390 )
2391 .await
2392 .unwrap()
2393 );
2394 assert!(
2396 !lookup_actor
2397 .actor_exists(&client, ActorRef::attest(lookup_actor.actor_id().clone()))
2398 .await
2399 .unwrap()
2400 );
2401
2402 target_actor.drain_and_stop().unwrap();
2403 target_actor.await;
2404
2405 assert!(
2406 !lookup_actor
2407 .actor_exists(&client, target_actor_ref)
2408 .await
2409 .unwrap()
2410 );
2411
2412 lookup_actor.drain_and_stop().unwrap();
2413 lookup_actor.await;
2414 }
2415
2416 fn validate_link(child: &InstanceCell, parent: &InstanceCell) {
2417 assert_eq!(child.actor_id().proc_id(), parent.actor_id().proc_id());
2418 assert_eq!(
2419 child.inner.parent.upgrade().unwrap().actor_id(),
2420 parent.actor_id()
2421 );
2422 assert_matches!(
2423 parent.inner.children.get(&child.pid()),
2424 Some(node) if node.actor_id() == child.actor_id()
2425 );
2426 }
2427
2428 #[tracing_test::traced_test]
2429 #[async_timed_test(timeout_secs = 30)]
2430 async fn test_spawn_child() {
2431 let proc = Proc::local();
2432
2433 let first = proc.spawn::<TestActor>("first", ()).await.unwrap();
2434 let second = TestActor::spawn_child(&first).await;
2435 let third = TestActor::spawn_child(&second).await;
2436
2437 assert!(logs_with_scope_contain(
2439 "hyperactor::proc",
2440 format!(
2441 "{}: spawned with {:?}",
2442 first.actor_id(),
2443 first.cell().actor_task_handle().unwrap()
2444 )
2445 .as_str()
2446 ));
2447 assert!(logs_with_scope_contain(
2448 "hyperactor::proc",
2449 format!(
2450 "{}: spawned with {:?}",
2451 second.actor_id(),
2452 second.cell().actor_task_handle().unwrap()
2453 )
2454 .as_str()
2455 ));
2456 assert!(logs_with_scope_contain(
2457 "hyperactor::proc",
2458 format!(
2459 "{}: spawned with {:?}",
2460 third.actor_id(),
2461 third.cell().actor_task_handle().unwrap()
2462 )
2463 .as_str()
2464 ));
2465
2466 assert_eq!(first.actor_id().proc_id(), proc.proc_id());
2468 assert_eq!(second.actor_id(), &first.actor_id().child_id(1));
2469 assert_eq!(third.actor_id(), &first.actor_id().child_id(2));
2470
2471 validate_link(third.cell(), second.cell());
2473 validate_link(second.cell(), first.cell());
2474 assert!(first.cell().inner.parent.upgrade().is_none());
2475
2476 let third_cell = third.cell().clone();
2479 third.drain_and_stop().unwrap();
2480 third.await;
2481 assert!(third_cell.inner.children.is_empty());
2482 drop(third_cell);
2483 validate_link(second.cell(), first.cell());
2484
2485 let second_cell = second.cell().clone();
2486 second.drain_and_stop().unwrap();
2487 second.await;
2488 assert!(second_cell.inner.children.is_empty());
2489 drop(second_cell);
2490
2491 let first_cell = first.cell().clone();
2492 first.drain_and_stop().unwrap();
2493 first.await;
2494 assert!(first_cell.inner.children.is_empty());
2495 }
2496
2497 #[async_timed_test(timeout_secs = 30)]
2498 async fn test_child_lifecycle() {
2499 let proc = Proc::local();
2500
2501 let root = proc.spawn::<TestActor>("root", ()).await.unwrap();
2502 let root_1 = TestActor::spawn_child(&root).await;
2503 let root_2 = TestActor::spawn_child(&root).await;
2504 let root_2_1 = TestActor::spawn_child(&root_2).await;
2505
2506 root.drain_and_stop().unwrap();
2507 root.await;
2508
2509 for actor in [root_1, root_2, root_2_1] {
2510 assert!(actor.send(TestActorMessage::Noop()).is_err());
2511 assert_matches!(actor.await, ActorStatus::Stopped);
2512 }
2513 }
2514
2515 #[async_timed_test(timeout_secs = 30)]
2516 async fn test_parent_failure() {
2517 let proc = Proc::local();
2518 ProcSupervisionCoordinator::set(&proc).await.unwrap();
2521
2522 let root = proc.spawn::<TestActor>("root", ()).await.unwrap();
2523 let root_1 = TestActor::spawn_child(&root).await;
2524 let root_2 = TestActor::spawn_child(&root).await;
2525 let root_2_1 = TestActor::spawn_child(&root_2).await;
2526
2527 root_2
2528 .send(TestActorMessage::Fail(anyhow::anyhow!(
2529 "some random failure"
2530 )))
2531 .unwrap();
2532 let _root_2_actor_id = root_2.actor_id().clone();
2533 assert_matches!(
2534 root_2.await,
2535 ActorStatus::Failed(err) if err.to_string() == "some random failure"
2536 );
2537
2538 assert_matches!(
2542 root.await,
2543 ActorStatus::Failed(err) if err.to_string().contains("some random failure")
2544 );
2545 assert_eq!(root_2_1.await, ActorStatus::Stopped);
2546 assert_eq!(root_1.await, ActorStatus::Stopped);
2547 }
2548
2549 #[async_timed_test(timeout_secs = 30)]
2550 async fn test_actor_ledger() {
2551 async fn wait_until_idle(actor_handle: &ActorHandle<TestActor>) {
2552 actor_handle
2553 .status()
2554 .wait_for(|state: &ActorStatus| matches!(*state, ActorStatus::Idle))
2555 .await
2556 .unwrap();
2557 }
2558
2559 let proc = Proc::local();
2560
2561 let root: ActorHandle<TestActor> = proc.spawn::<TestActor>("root", ()).await.unwrap();
2563 wait_until_idle(&root).await;
2564 {
2565 let snapshot = proc.state().ledger.snapshot();
2566 assert_eq!(
2567 snapshot.roots,
2568 hashmap! {
2569 root.actor_id().clone() =>
2570 ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string())
2571 },
2572 );
2573 }
2574
2575 let another_root: ActorHandle<TestActor> =
2577 proc.spawn::<TestActor>("another_root", ()).await.unwrap();
2578 wait_until_idle(&another_root).await;
2579 {
2580 let snapshot = proc.state().ledger.snapshot();
2581 assert_eq!(
2582 snapshot.roots,
2583 hashmap! {
2584 root.actor_id().clone() =>
2585 ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string()),
2586 another_root.actor_id().clone() =>
2587 ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string()),
2588 },
2589 );
2590 }
2591
2592 another_root.drain_and_stop().unwrap();
2595 another_root.await;
2596 {
2597 let snapshot = proc.state().ledger.snapshot();
2598 assert_eq!(
2599 snapshot.roots,
2600 hashmap! { root.actor_id().clone() =>
2601 ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string())
2602 },
2603 );
2604 }
2605
2606 let root_1 = TestActor::spawn_child(&root).await;
2612 wait_until_idle(&root_1).await;
2613 wait_until_idle(&root).await;
2615 {
2616 let snapshot = proc.state().ledger.snapshot();
2617 assert_eq!(
2618 snapshot.roots,
2619 hashmap! {
2620 root.actor_id().clone() => ActorTreeSnapshot {
2621 pid: 0,
2622 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2623 status: ActorStatus::Idle,
2624 stats: ActorStats { num_processed_messages: 1 },
2625 handlers: HashMap::new(),
2626 children: hashmap! {
2627 root_1.actor_id().pid() =>
2628 ActorTreeSnapshot::empty_typed(
2629 root_1.actor_id().pid(),
2630 "hyperactor::proc::tests::TestActor".to_string()
2631 )
2632 },
2633 events: Vec::new(),
2634 spans: Vec::new(),
2635 }
2636 },
2637 );
2638 }
2639
2640 let root_1_1 = TestActor::spawn_child(&root_1).await;
2641 wait_until_idle(&root_1_1).await;
2642 wait_until_idle(&root_1).await;
2643 {
2644 let snapshot = proc.state().ledger.snapshot();
2645 assert_eq!(
2646 snapshot.roots,
2647 hashmap! {
2648 root.actor_id().clone() => ActorTreeSnapshot {
2649 pid: 0,
2650 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2651 status: ActorStatus::Idle,
2652 stats: ActorStats { num_processed_messages: 1 },
2653 handlers: HashMap::new(),
2654 children: hashmap!{
2655 root_1.actor_id().pid() =>
2656 ActorTreeSnapshot {
2657 pid: root_1.actor_id().pid(),
2658 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2659 status: ActorStatus::Idle,
2660 stats: ActorStats { num_processed_messages: 1 },
2661 handlers: HashMap::new(),
2662 children: hashmap!{
2663 root_1_1.actor_id().pid() =>
2664 ActorTreeSnapshot::empty_typed(
2665 root_1_1.actor_id().pid(),
2666 "hyperactor::proc::tests::TestActor".to_string()
2667 )
2668 },
2669 events: Vec::new(),
2670 spans: Vec::new(),
2671 }
2672 },
2673 events: Vec::new(),
2674 spans: Vec::new(),
2675 },
2676 }
2677 );
2678 }
2679
2680 let root_2 = TestActor::spawn_child(&root).await;
2681 wait_until_idle(&root_2).await;
2682 wait_until_idle(&root).await;
2683 {
2684 let snapshot = proc.state().ledger.snapshot();
2685 assert_eq!(
2686 snapshot.roots,
2687 hashmap! {
2688 root.actor_id().clone() => ActorTreeSnapshot {
2689 pid: 0,
2690 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2691 status: ActorStatus::Idle,
2692 stats: ActorStats { num_processed_messages: 2 },
2693 handlers: HashMap::new(),
2694 children: hashmap!{
2695 root_2.actor_id().pid() =>
2696 ActorTreeSnapshot{
2697 pid: root_2.actor_id().pid(),
2698 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2699 status: ActorStatus::Idle,
2700 stats: ActorStats::default(),
2701 handlers: HashMap::new(),
2702 children: HashMap::new(),
2703 events: Vec::new(),
2704 spans: Vec::new(),
2705 },
2706 root_1.actor_id().pid() =>
2707 ActorTreeSnapshot{
2708 pid: root_1.actor_id().pid(),
2709 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2710 status: ActorStatus::Idle,
2711 stats: ActorStats { num_processed_messages: 1 },
2712 handlers: HashMap::new(),
2713 children: hashmap!{
2714 root_1_1.actor_id().pid() =>
2715 ActorTreeSnapshot::empty_typed(
2716 root_1_1.actor_id().pid(),
2717 "hyperactor::proc::tests::TestActor".to_string()
2718 )
2719 },
2720 events: Vec::new(),
2721 spans: Vec::new(),
2722 },
2723 },
2724 events: Vec::new(),
2725 spans: Vec::new(),
2726 },
2727 }
2728 );
2729 }
2730
2731 root_1.drain_and_stop().unwrap();
2733 root_1.await;
2734 wait_until_idle(&root).await;
2736 {
2737 let snapshot = proc.state().ledger.snapshot();
2738 assert_eq!(
2739 snapshot.roots,
2740 hashmap! {
2741 root.actor_id().clone() => ActorTreeSnapshot {
2742 pid: 0,
2743 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2744 status: ActorStatus::Idle,
2745 stats: ActorStats { num_processed_messages: 3 },
2746 handlers: HashMap::new(),
2747 children: hashmap!{
2748 root_2.actor_id().pid() =>
2749 ActorTreeSnapshot {
2750 pid: root_2.actor_id().pid(),
2751 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2752 status: ActorStatus::Idle,
2753 stats: ActorStats::default(),
2754 handlers: HashMap::new(),
2755 children: HashMap::new(),
2756 events: Vec::new(),
2757 spans: Vec::new(),
2758 }
2759 },
2760 events: Vec::new(),
2761 spans: Vec::new(),
2762 },
2763 }
2764 );
2765 }
2766
2767 root.drain_and_stop().unwrap();
2769 root.await;
2770 {
2771 let snapshot = proc.state().ledger.snapshot();
2772 assert_eq!(snapshot.roots, hashmap! {});
2773 }
2774 }
2775
2776 #[async_timed_test(timeout_secs = 30)]
2777 async fn test_multi_handler() {
2778 #[derive(Debug)]
2782 struct TestActor(Arc<AtomicUsize>);
2783
2784 #[async_trait]
2785 impl Actor for TestActor {
2786 type Params = Arc<AtomicUsize>;
2787
2788 async fn new(param: Arc<AtomicUsize>) -> Result<Self, anyhow::Error> {
2789 Ok(Self(param))
2790 }
2791 }
2792
2793 #[async_trait]
2794 impl Handler<OncePortHandle<PortHandle<usize>>> for TestActor {
2795 async fn handle(
2796 &mut self,
2797 cx: &crate::Context<Self>,
2798 message: OncePortHandle<PortHandle<usize>>,
2799 ) -> anyhow::Result<()> {
2800 message.send(cx.port())?;
2801 Ok(())
2802 }
2803 }
2804
2805 #[async_trait]
2806 impl Handler<usize> for TestActor {
2807 async fn handle(
2808 &mut self,
2809 _cx: &crate::Context<Self>,
2810 message: usize,
2811 ) -> anyhow::Result<()> {
2812 self.0.fetch_add(message, Ordering::SeqCst);
2813 Ok(())
2814 }
2815 }
2816
2817 let proc = Proc::local();
2818 let state = Arc::new(AtomicUsize::new(0));
2819 let handle = proc
2820 .spawn::<TestActor>("test", state.clone())
2821 .await
2822 .unwrap();
2823 let client = proc.attach("client").unwrap();
2824 let (tx, rx) = client.open_once_port();
2825 handle.send(tx).unwrap();
2826 let usize_handle = rx.recv().await.unwrap();
2827 usize_handle.send(123).unwrap();
2828
2829 handle.drain_and_stop().unwrap();
2830 handle.await;
2831
2832 assert_eq!(state.load(Ordering::SeqCst), 123);
2833 }
2834
2835 #[async_timed_test(timeout_secs = 30)]
2836 async fn test_actor_panic() {
2837 panic_handler::set_panic_hook();
2839
2840 let proc = Proc::local();
2841 ProcSupervisionCoordinator::set(&proc).await.unwrap();
2844
2845 let (client, _handle) = proc.instance("client").unwrap();
2846 let actor_handle = proc.spawn::<TestActor>("test", ()).await.unwrap();
2847 actor_handle
2848 .panic(&client, "some random failure".to_string())
2849 .await
2850 .unwrap();
2851 let actor_status = actor_handle.await;
2852
2853 assert_matches!(actor_status, ActorStatus::Failed(_));
2857 if let ActorStatus::Failed(err) = actor_status {
2858 let error_msg = err.to_string();
2859 assert!(error_msg.contains("some random failure"));
2861 assert!(error_msg.contains("rust_begin_unwind"));
2865 }
2866 }
2867
2868 #[async_timed_test(timeout_secs = 30)]
2869 async fn test_local_supervision_propagation() {
2870 #[derive(Debug)]
2871 struct TestActor(Arc<AtomicBool>, bool);
2872
2873 #[async_trait]
2874 impl Actor for TestActor {
2875 type Params = (Arc<AtomicBool>, bool);
2876
2877 async fn new(param: (Arc<AtomicBool>, bool)) -> Result<Self, anyhow::Error> {
2878 Ok(Self(param.0, param.1))
2879 }
2880
2881 async fn handle_supervision_event(
2882 &mut self,
2883 _this: &Instance<Self>,
2884 _event: &ActorSupervisionEvent,
2885 ) -> Result<bool, anyhow::Error> {
2886 if !self.1 {
2887 return Ok(false);
2888 }
2889
2890 tracing::error!(
2891 "{}: supervision event received: {:?}",
2892 _this.self_id(),
2893 _event
2894 );
2895 self.0.store(true, Ordering::SeqCst);
2896 Ok(true)
2897 }
2898 }
2899
2900 #[async_trait]
2901 impl Handler<String> for TestActor {
2902 async fn handle(
2903 &mut self,
2904 cx: &crate::Context<Self>,
2905 message: String,
2906 ) -> anyhow::Result<()> {
2907 tracing::info!("{} received message: {}", cx.self_id(), message);
2908 Err(anyhow::anyhow!(message))
2909 }
2910 }
2911
2912 let proc = Proc::local();
2913 let reported_event = ProcSupervisionCoordinator::set(&proc).await.unwrap();
2914
2915 let root_state = Arc::new(AtomicBool::new(false));
2916 let root_1_state = Arc::new(AtomicBool::new(false));
2917 let root_1_1_state = Arc::new(AtomicBool::new(false));
2918 let root_1_1_1_state = Arc::new(AtomicBool::new(false));
2919 let root_2_state = Arc::new(AtomicBool::new(false));
2920 let root_2_1_state = Arc::new(AtomicBool::new(false));
2921
2922 let root = proc
2923 .spawn::<TestActor>("root", (root_state.clone(), false))
2924 .await
2925 .unwrap();
2926 let root_1 = proc
2927 .spawn_child::<TestActor>(
2928 root.cell().clone(),
2929 (
2930 root_1_state.clone(),
2931 true, ),
2933 )
2934 .await
2935 .unwrap();
2936 let root_1_1 = proc
2937 .spawn_child::<TestActor>(root_1.cell().clone(), (root_1_1_state.clone(), false))
2938 .await
2939 .unwrap();
2940 let root_1_1_1 = proc
2941 .spawn_child::<TestActor>(root_1_1.cell().clone(), (root_1_1_1_state.clone(), false))
2942 .await
2943 .unwrap();
2944 let root_2 = proc
2945 .spawn_child::<TestActor>(root.cell().clone(), (root_2_state.clone(), false))
2946 .await
2947 .unwrap();
2948 let root_2_1 = proc
2949 .spawn_child::<TestActor>(root_2.cell().clone(), (root_2_1_state.clone(), false))
2950 .await
2951 .unwrap();
2952
2953 root_1_1_1
2956 .send::<String>("some random failure".into())
2957 .unwrap();
2958
2959 root_2_1
2962 .send::<String>("some random failure".into())
2963 .unwrap();
2964
2965 RealClock.sleep(Duration::from_secs(1)).await;
2966
2967 assert!(!root_state.load(Ordering::SeqCst));
2968 assert!(root_1_state.load(Ordering::SeqCst));
2969 assert!(!root_1_1_state.load(Ordering::SeqCst));
2970 assert!(!root_1_1_1_state.load(Ordering::SeqCst));
2971 assert!(!root_2_state.load(Ordering::SeqCst));
2972 assert!(!root_2_1_state.load(Ordering::SeqCst));
2973 assert_eq!(
2974 reported_event.event().map(|e| e.actor_id.clone()),
2975 Some(root_2_1.actor_id().clone())
2976 );
2977 }
2978
2979 #[async_timed_test(timeout_secs = 30)]
2980 async fn test_instance() {
2981 #[derive(Debug, Default, Actor)]
2982 struct TestActor;
2983
2984 #[async_trait]
2985 impl Handler<(String, PortRef<String>)> for TestActor {
2986 async fn handle(
2987 &mut self,
2988 cx: &crate::Context<Self>,
2989 (message, port): (String, PortRef<String>),
2990 ) -> anyhow::Result<()> {
2991 port.send(cx, message)?;
2992 Ok(())
2993 }
2994 }
2995
2996 let proc = Proc::local();
2997
2998 let (instance, handle) = proc.instance("my_test_actor").unwrap();
2999
3000 let child_actor = TestActor::spawn(&instance, ()).await.unwrap();
3001
3002 let (port, mut receiver) = instance.open_port();
3003 child_actor
3004 .send(("hello".to_string(), port.bind()))
3005 .unwrap();
3006
3007 let message = receiver.recv().await.unwrap();
3008 assert_eq!(message, "hello");
3009
3010 child_actor.drain_and_stop().unwrap();
3011 child_actor.await;
3012
3013 assert_eq!(*handle.status().borrow(), ActorStatus::Client);
3014 drop(instance);
3015 assert_eq!(*handle.status().borrow(), ActorStatus::Stopped);
3016 handle.await;
3017 }
3018
3019 #[tokio::test]
3020 async fn test_proc_terminate_without_coordinator() {
3021 if std::env::var("CARGO_TEST").is_ok() {
3022 eprintln!("test skipped as it hangs when run by cargo in sandcastle");
3023 return;
3024 }
3025
3026 let process = async {
3027 let proc = Proc::local();
3028 let root = proc.spawn::<TestActor>("root", ()).await.unwrap();
3032 let (client, _handle) = proc.instance("client").unwrap();
3033 root.fail(&client, anyhow::anyhow!("some random failure"))
3034 .await
3035 .unwrap();
3036 RealClock.sleep(Duration::from_secs(30)).await;
3040 };
3041
3042 assert_termination(|| process, 1).await.unwrap();
3043 }
3044
3045 fn trace_and_block(fut: impl Future) {
3046 tracing::subscriber::with_default(
3047 tracing_subscriber::Registry::default().with(hyperactor_telemetry::recorder().layer()),
3048 || {
3049 tokio::runtime::Builder::new_current_thread()
3050 .enable_all()
3051 .build()
3052 .unwrap()
3053 .block_on(fut)
3054 },
3055 );
3056 }
3057
3058 #[ignore = "until trace recording is turned back on"]
3059 #[test]
3060 fn test_handler_logging() {
3061 #[derive(Debug, Default, Actor)]
3062 struct LoggingActor;
3063
3064 impl LoggingActor {
3065 async fn wait(handle: &ActorHandle<Self>) {
3066 let barrier = Arc::new(Barrier::new(2));
3067 handle.send(barrier.clone()).unwrap();
3068 barrier.wait().await;
3069 }
3070 }
3071
3072 #[async_trait]
3073 impl Handler<String> for LoggingActor {
3074 async fn handle(
3075 &mut self,
3076 _cx: &crate::Context<Self>,
3077 message: String,
3078 ) -> anyhow::Result<()> {
3079 tracing::info!("{}", message);
3080 Ok(())
3081 }
3082 }
3083
3084 #[async_trait]
3085 impl Handler<u64> for LoggingActor {
3086 async fn handle(
3087 &mut self,
3088 _cx: &crate::Context<Self>,
3089 message: u64,
3090 ) -> anyhow::Result<()> {
3091 tracing::event!(Level::INFO, number = message);
3092 Ok(())
3093 }
3094 }
3095
3096 #[async_trait]
3097 impl Handler<Arc<Barrier>> for LoggingActor {
3098 async fn handle(
3099 &mut self,
3100 _cx: &crate::Context<Self>,
3101 message: Arc<Barrier>,
3102 ) -> anyhow::Result<()> {
3103 message.wait().await;
3104 Ok(())
3105 }
3106 }
3107
3108 #[async_trait]
3109 impl Handler<Arc<(Barrier, Barrier)>> for LoggingActor {
3110 async fn handle(
3111 &mut self,
3112 _cx: &crate::Context<Self>,
3113 barriers: Arc<(Barrier, Barrier)>,
3114 ) -> anyhow::Result<()> {
3115 let inner = tracing::span!(Level::INFO, "child_span");
3116 let _inner_guard = inner.enter();
3117 barriers.0.wait().await;
3118 barriers.1.wait().await;
3119 Ok(())
3120 }
3121 }
3122
3123 trace_and_block(async {
3124 let handle = LoggingActor::spawn_detached(()).await.unwrap();
3125 handle.send("hello world".to_string()).unwrap();
3126 handle.send("hello world again".to_string()).unwrap();
3127 handle.send(123u64).unwrap();
3128
3129 LoggingActor::wait(&handle).await;
3130
3131 let events = handle.cell().inner.recording.tail();
3132 assert_eq!(events.len(), 3);
3133 assert_eq!(events[0].json_value(), json!({ "message": "hello world" }));
3134 assert_eq!(
3135 events[1].json_value(),
3136 json!({ "message": "hello world again" })
3137 );
3138 assert_eq!(events[2].json_value(), json!({ "number": 123 }));
3139
3140 let stacks = {
3141 let barriers = Arc::new((Barrier::new(2), Barrier::new(2)));
3142 handle.send(Arc::clone(&barriers)).unwrap();
3143 barriers.0.wait().await;
3144 let stacks = handle.cell().inner.recording.stacks();
3145 barriers.1.wait().await;
3146 stacks
3147 };
3148 assert_eq!(stacks.len(), 1);
3149 assert_eq!(stacks[0].len(), 1);
3150 assert_eq!(stacks[0][0].name(), "child_span");
3151 })
3152 }
3153}