1use std::any::Any;
15use std::any::TypeId;
16use std::collections::HashMap;
17use std::fmt;
18use std::future::Future;
19use std::hash::Hash;
20use std::hash::Hasher;
21use std::ops::Deref;
22use std::panic;
23use std::panic::AssertUnwindSafe;
24use std::panic::Location;
25use std::pin::Pin;
26use std::sync::Arc;
27use std::sync::OnceLock;
28use std::sync::Weak;
29use std::sync::atomic::AtomicU64;
30use std::sync::atomic::AtomicUsize;
31use std::sync::atomic::Ordering;
32use std::time::Duration;
33use std::time::SystemTime;
34
35use async_trait::async_trait;
36use dashmap::DashMap;
37use dashmap::mapref::entry::Entry;
38use dashmap::mapref::multiple::RefMulti;
39use futures::FutureExt;
40use hyperactor_config::AttrValue;
41use hyperactor_config::attrs::Attrs;
42use hyperactor_config::attrs::declare_attrs;
43use hyperactor_telemetry::recorder;
44use hyperactor_telemetry::recorder::Recording;
45use serde::Deserialize;
46use serde::Serialize;
47use tokio::sync::mpsc;
48use tokio::sync::watch;
49use tokio::task::JoinHandle;
50use tracing::Instrument;
51use tracing::Level;
52use tracing::Span;
53use typeuri::Named as _;
54use uuid::Uuid;
55use wirevalue::TypeInfo;
56
57use crate as hyperactor;
58use crate::Actor;
59use crate::ActorRef;
60use crate::Handler;
61use crate::Message;
62use crate::RemoteMessage;
63use crate::actor::ActorError;
64use crate::actor::ActorErrorKind;
65use crate::actor::ActorHandle;
66use crate::actor::ActorStatus;
67use crate::actor::Binds;
68use crate::actor::Referable;
69use crate::actor::RemoteHandles;
70use crate::actor::Signal;
71use crate::channel;
72use crate::channel::ChannelAddr;
73use crate::channel::ChannelError;
74use crate::clock::Clock;
75use crate::clock::ClockKind;
76use crate::clock::RealClock;
77use crate::config;
78use crate::context;
79use crate::mailbox::BoxedMailboxSender;
80use crate::mailbox::DeliveryError;
81use crate::mailbox::DialMailboxRouter;
82use crate::mailbox::IntoBoxedMailboxSender as _;
83use crate::mailbox::Mailbox;
84use crate::mailbox::MailboxMuxer;
85use crate::mailbox::MailboxSender;
86use crate::mailbox::MailboxServer as _;
87use crate::mailbox::MessageEnvelope;
88use crate::mailbox::OncePortHandle;
89use crate::mailbox::OncePortReceiver;
90use crate::mailbox::PanickingMailboxSender;
91use crate::mailbox::PortHandle;
92use crate::mailbox::PortReceiver;
93use crate::mailbox::Undeliverable;
94use crate::metrics::ACTOR_MESSAGE_HANDLER_DURATION;
95use crate::metrics::ACTOR_MESSAGE_QUEUE_SIZE;
96use crate::metrics::ACTOR_MESSAGES_RECEIVED;
97use crate::ordering::OrderedSender;
98use crate::ordering::OrderedSenderError;
99use crate::ordering::Sequencer;
100use crate::ordering::ordered_channel;
101use crate::panic_handler;
102use crate::reference::ActorId;
103use crate::reference::Index;
104use crate::reference::PortId;
105use crate::reference::ProcId;
106use crate::reference::id;
107use crate::supervision::ActorSupervisionEvent;
108
109static NEXT_LOCAL_RANK: AtomicUsize = AtomicUsize::new(0);
111
112#[derive(Clone)]
119pub struct Proc {
120 inner: Arc<ProcState>,
121}
122
123impl fmt::Debug for Proc {
124 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125 f.debug_struct("Proc")
126 .field("proc_id", &self.inner.proc_id)
127 .finish()
128 }
129}
130
131struct ProcState {
132 proc_id: ProcId,
135
136 proc_muxer: MailboxMuxer,
139
140 forwarder: BoxedMailboxSender,
142
143 roots: DashMap<String, AtomicUsize>,
147
148 ledger: ActorLedger,
150
151 instances: DashMap<ActorId, WeakInstanceCell>,
152
153 supervision_coordinator_port: OnceLock<PortHandle<ActorSupervisionEvent>>,
156
157 clock: ClockKind,
158}
159
160impl Drop for ProcState {
161 fn drop(&mut self) {
162 tracing::info!(
166 proc_id = %self.proc_id,
167 name = "ProcStatus",
168 status = "Dropped"
169 );
170 }
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
175pub struct ActorLedgerSnapshot {
176 pub roots: HashMap<ActorId, ActorTreeSnapshot>,
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
183pub struct Event {
184 pub time: SystemTime,
186 pub fields: Vec<(String, recorder::Value)>,
188 pub seq: usize,
190}
191
192impl From<recorder::Event> for Event {
193 fn from(event: recorder::Event) -> Event {
194 Event {
195 time: event.time,
196 fields: event.fields(),
197 seq: event.seq,
198 }
199 }
200}
201
202#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
204pub struct ActorTreeSnapshot {
205 pub pid: Index,
207
208 pub type_name: String,
212
213 pub status: ActorStatus,
215
216 pub stats: ActorStats,
218
219 pub handlers: HashMap<u64, String>,
221
222 pub children: HashMap<Index, ActorTreeSnapshot>,
224
225 pub events: Vec<Event>,
227
228 pub spans: Vec<Vec<String>>,
231}
232
233impl Hash for ActorTreeSnapshot {
234 fn hash<H: Hasher>(&self, state: &mut H) {
235 self.pid.hash(state);
236 }
237}
238
239#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
241#[derive(Default)]
242pub struct ActorStats {
243 num_processed_messages: u64,
245}
246
247impl fmt::Display for ActorStats {
248 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
249 write!(f, "num_processed_messages={}", self.num_processed_messages)
250 }
251}
252
253#[derive(Debug)]
254struct ActorLedger {
255 roots: DashMap<ActorId, WeakInstanceCell>,
257}
258
259impl ActorLedger {
260 fn new() -> Self {
261 Self {
262 roots: DashMap::new(),
263 }
264 }
265
266 fn insert(
267 &self,
268 root_actor_id: ActorId,
269 root_actor_cell: WeakInstanceCell,
270 ) -> Result<(), anyhow::Error> {
271 match self.roots.insert(root_actor_id.clone(), root_actor_cell) {
272 None => Ok(()),
273 Some(current_cell) => {
276 let debugging_msg = match current_cell.upgrade() {
277 Some(cell) => format!("the stored cell's actor ID is {}", cell.actor_id()),
278 None => "the stored cell has been dropped".to_string(),
279 };
280
281 Err(anyhow::anyhow!(
282 "actor '{root_actor_id}' has already been added to ledger: {debugging_msg}"
283 ))
284 }
285 }
286 }
287
288 fn snapshot(&self) -> ActorLedgerSnapshot {
290 let roots = self
291 .roots
292 .iter()
293 .flat_map(|r| {
294 let (actor_id, weak_cell) = r.pair();
295 weak_cell
299 .upgrade()
300 .map(|cell| (actor_id.clone(), Self::get_actor_tree_snapshot(&cell)))
301 })
302 .collect();
303
304 ActorLedgerSnapshot { roots }
305 }
306
307 fn get_actor_tree_snapshot(cell: &InstanceCell) -> ActorTreeSnapshot {
308 let children = cell
310 .child_iter()
311 .map(|child| (child.pid(), Self::get_actor_tree_snapshot(child.value())))
312 .collect();
313
314 ActorTreeSnapshot {
315 pid: cell.actor_id().pid(),
316 type_name: cell.inner.actor_type.type_name().to_string(),
317 status: cell.status().borrow().clone(),
318 stats: ActorStats {
319 num_processed_messages: cell.inner.num_processed_messages.load(Ordering::SeqCst),
320 },
321 handlers: cell
322 .inner
323 .exported_named_ports
324 .iter()
325 .map(|entry| (*entry.key(), entry.value().to_string()))
326 .collect(),
327 children,
328 events: cell
329 .inner
330 .recording
331 .tail()
332 .into_iter()
333 .map(Event::from)
334 .collect(),
335 spans: cell
336 .inner
337 .recording
338 .stacks()
339 .into_iter()
340 .map(|stack| {
341 stack
342 .into_iter()
343 .map(|meta| meta.name().to_string())
344 .collect()
345 })
346 .collect(),
347 }
348 }
349}
350
351impl Proc {
352 pub fn new(proc_id: ProcId, forwarder: BoxedMailboxSender) -> Self {
354 Self::new_with_clock(proc_id, forwarder, ClockKind::default())
355 }
356
357 pub fn direct(addr: ChannelAddr, name: String) -> Result<Self, ChannelError> {
359 let (addr, rx) = channel::serve(addr)?;
360 let proc_id = ProcId::Direct(addr, name);
361 let proc = Self::new(proc_id, DialMailboxRouter::new().into_boxed());
362 proc.clone().serve(rx);
363 Ok(proc)
364 }
365
366 pub fn direct_with_default(
368 addr: ChannelAddr,
369 name: String,
370 default: BoxedMailboxSender,
371 ) -> Result<Self, ChannelError> {
372 let (addr, rx) = channel::serve(addr)?;
373 let proc_id = ProcId::Direct(addr, name);
374 let proc = Self::new(
375 proc_id,
376 DialMailboxRouter::new_with_default(default).into_boxed(),
377 );
378 proc.clone().serve(rx);
379 Ok(proc)
380 }
381
382 pub fn new_with_clock(
384 proc_id: ProcId,
385 forwarder: BoxedMailboxSender,
386 clock: ClockKind,
387 ) -> Self {
388 tracing::info!(
389 proc_id = %proc_id,
390 name = "ProcStatus",
391 status = "Created"
392 );
393 Self {
394 inner: Arc::new(ProcState {
395 proc_id,
396 proc_muxer: MailboxMuxer::new(),
397 forwarder,
398 roots: DashMap::new(),
399 ledger: ActorLedger::new(),
400 instances: DashMap::new(),
401 supervision_coordinator_port: OnceLock::new(),
402 clock,
403 }),
404 }
405 }
406
407 pub fn set_supervision_coordinator(
410 &self,
411 port: PortHandle<ActorSupervisionEvent>,
412 ) -> Result<(), anyhow::Error> {
413 self.state()
414 .supervision_coordinator_port
415 .set(port)
416 .map_err(|existing| anyhow::anyhow!("coordinator port is already set to {existing}"))
417 }
418
419 pub fn handle_supervision_event(&self, event: ActorSupervisionEvent) {
422 let result = match self.state().supervision_coordinator_port.get() {
423 Some(port) => port.send(event.clone()).map_err(anyhow::Error::from),
424 None => Err(anyhow::anyhow!(
425 "coordinator port is not set for proc {}",
426 self.proc_id(),
427 )),
428 };
429 if let Err(err) = result {
430 tracing::error!(
431 "proc {}: could not propagate supervision event {} due to error: {:?}: crashing",
432 self.proc_id(),
433 event,
434 err
435 );
436
437 std::process::exit(1);
438 }
439 }
440
441 pub fn local() -> Self {
444 let proc_id = ProcId::Ranked(id!(local), NEXT_LOCAL_RANK.fetch_add(1, Ordering::Relaxed));
447 Proc::new(proc_id, BoxedMailboxSender::new(PanickingMailboxSender))
449 }
450
451 pub fn proc_id(&self) -> &ProcId {
453 &self.state().proc_id
454 }
455
456 pub fn forwarder(&self) -> &BoxedMailboxSender {
459 &self.inner.forwarder
460 }
461
462 fn state(&self) -> &ProcState {
464 self.inner.as_ref()
465 }
466
467 pub fn clock(&self) -> &ClockKind {
469 &self.state().clock
470 }
471
472 pub fn ledger_snapshot(&self) -> ActorLedgerSnapshot {
474 self.state().ledger.snapshot()
475 }
476
477 pub fn attach(&self, name: &str) -> Result<Mailbox, anyhow::Error> {
479 let actor_id: ActorId = self.allocate_root_id(name)?;
480 Ok(self.bind_mailbox(actor_id))
481 }
482
483 pub fn attach_child(&self, parent_id: &ActorId) -> Result<Mailbox, anyhow::Error> {
485 let actor_id: ActorId = self.allocate_child_id(parent_id)?;
486 Ok(self.bind_mailbox(actor_id))
487 }
488
489 fn bind_mailbox(&self, actor_id: ActorId) -> Mailbox {
491 let mbox = Mailbox::new(actor_id, BoxedMailboxSender::new(self.downgrade()));
492
493 self.state().proc_muxer.bind_mailbox(mbox.clone());
496 mbox
497 }
498
499 pub fn attach_actor<R, M>(
502 &self,
503 name: &str,
504 ) -> Result<(Instance<()>, ActorRef<R>, PortReceiver<M>), anyhow::Error>
505 where
506 M: RemoteMessage,
507 R: Referable + RemoteHandles<M>,
508 {
509 let (instance, _handle) = self.instance(name)?;
510 let (_handle, rx) = instance.bind_actor_port::<M>();
511 let actor_ref = ActorRef::attest(instance.self_id().clone());
512 Ok((instance, actor_ref, rx))
513 }
514
515 pub fn spawn<A: Actor>(&self, name: &str, actor: A) -> Result<ActorHandle<A>, anyhow::Error> {
518 let actor_id = self.allocate_root_id(name)?;
519 let span = tracing::span!(
520 Level::INFO,
521 "spawn_actor",
522 actor_name = name,
523 actor_type = std::any::type_name::<A>(),
524 actor_id = actor_id.to_string(),
525 );
526 let (instance, mut actor_loop_receivers, work_rx) = {
527 let _guard = span.clone().entered();
528 Instance::new(self.clone(), actor_id.clone(), false, None)
529 };
530 self.state()
534 .ledger
535 .insert(actor_id.clone(), instance.inner.cell.downgrade())?;
536
537 Ok(instance.start(actor, actor_loop_receivers.take().unwrap(), work_rx))
538 }
539
540 pub fn instance(&self, name: &str) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
542 let (instance, handle, ..) = self.actor_instance(name)?;
543
544 Ok((instance, handle))
545 }
546
547 pub fn actor_instance<A: Actor>(
554 &self,
555 name: &str,
556 ) -> Result<
557 (
558 Instance<A>,
559 ActorHandle<A>,
560 PortReceiver<ActorSupervisionEvent>,
561 PortReceiver<Signal>,
562 mpsc::UnboundedReceiver<WorkCell<A>>,
563 ),
564 anyhow::Error,
565 > {
566 let actor_id = self.allocate_root_id(name)?;
567 let span = tracing::debug_span!(
568 "actor_instance",
569 actor_name = name,
570 actor_type = std::any::type_name::<A>(),
571 actor_id = actor_id.to_string(),
572 );
573 let _guard = span.enter();
574 let (instance, actor_loop_receivers, work_rx) =
575 Instance::new(self.clone(), actor_id.clone(), false, None);
576 let (signal_rx, supervision_rx) = actor_loop_receivers.unwrap();
577 let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
578 instance.change_status(ActorStatus::Client);
579 Ok((instance, handle, supervision_rx, signal_rx, work_rx))
580 }
581
582 fn child_instance(
584 &self,
585 parent: InstanceCell,
586 ) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
587 let actor_id = self.allocate_child_id(parent.actor_id())?;
588 let _ = tracing::debug_span!(
589 "child_actor_instance",
590 parent_actor_id = %parent.actor_id(),
591 actor_type = std::any::type_name::<()>(),
592 actor_id = %actor_id,
593 );
594
595 let (instance, _, _) = Instance::new(self.clone(), actor_id, false, Some(parent));
596 let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
597 instance.change_status(ActorStatus::Client);
598 Ok((instance, handle))
599 }
600
601 fn spawn_child<A: Actor>(
607 &self,
608 parent: InstanceCell,
609 actor: A,
610 ) -> Result<ActorHandle<A>, anyhow::Error> {
611 let actor_id = self.allocate_child_id(parent.actor_id())?;
612 let (instance, mut actor_loop_receivers, work_rx) =
613 Instance::new(self.clone(), actor_id, false, Some(parent.clone()));
614 Ok(instance.start(actor, actor_loop_receivers.take().unwrap(), work_rx))
615 }
616
617 pub fn abort_root_actor(
621 &self,
622 root: &ActorId,
623 this_handle: Option<&JoinHandle<()>>,
624 ) -> Option<impl Future<Output = ActorId>> {
625 self.state()
626 .ledger
627 .roots
628 .get(root)
629 .into_iter()
630 .flat_map(|e| e.upgrade())
631 .map(|cell| {
632 let r1 = root.clone();
633 let r2 = root.clone();
634 let skip_abort = this_handle.is_some_and(|this_h| {
636 cell.inner
637 .actor_task_handle
638 .get()
639 .is_some_and(|other_h| std::ptr::eq(this_h, other_h))
640 });
641 async move {
647 tokio::task::spawn_blocking(move || {
648 if !skip_abort {
649 let h = cell.inner.actor_task_handle.wait();
650 tracing::debug!("{}: aborting {:?}", r1, h);
651 h.abort();
652 }
653 })
654 .await
655 .unwrap();
656 r2
657 }
658 })
659 .next()
660 }
661
662 pub fn stop_actor(&self, actor_id: &ActorId) -> Option<watch::Receiver<ActorStatus>> {
665 if let Some(entry) = self.state().ledger.roots.get(actor_id) {
666 match entry.value().upgrade() {
667 None => None, Some(cell) => {
669 tracing::info!("sending stop signal to {}", cell.actor_id());
670 if let Err(err) = cell.signal(Signal::DrainAndStop) {
671 tracing::error!(
672 "{}: failed to send stop signal to pid {}: {:?}",
673 self.proc_id(),
674 cell.pid(),
675 err
676 );
677 None
678 } else {
679 Some(cell.status().clone())
680 }
681 }
682 }
683 } else {
684 tracing::error!("no actor {} found in {} roots", actor_id, self.proc_id());
685 None
686 }
687 }
688
689 pub async fn destroy_and_wait<A: Actor>(
697 &mut self,
698 timeout: Duration,
699 cx: Option<&Context<'_, A>>,
700 ) -> Result<(Vec<ActorId>, Vec<ActorId>), anyhow::Error> {
701 self.destroy_and_wait_except_current::<A>(timeout, cx, false)
702 .await
703 }
704
705 #[hyperactor::instrument]
715 pub async fn destroy_and_wait_except_current<A: Actor>(
716 &mut self,
717 timeout: Duration,
718 cx: Option<&Context<'_, A>>,
719 except_current: bool,
720 ) -> Result<(Vec<ActorId>, Vec<ActorId>), anyhow::Error> {
721 tracing::debug!("{}: proc stopping", self.proc_id());
722
723 let (this_handle, this_actor_id) = cx.map_or((None, None), |cx| {
724 (
725 Some(cx.actor_task_handle().expect("cannot call destroy_and_wait from inside an actor unless actor has finished starting")),
726 Some(cx.self_id())
727 )
728 });
729
730 let mut statuses = HashMap::new();
731 for actor_id in self
732 .state()
733 .ledger
734 .roots
735 .iter()
736 .map(|entry| entry.key().clone())
737 .collect::<Vec<_>>()
738 {
739 if let Some(status) = self.stop_actor(&actor_id) {
740 statuses.insert(actor_id, status);
741 }
742 }
743 tracing::debug!("{}: proc stopped", self.proc_id());
744
745 let waits: Vec<_> = statuses
746 .iter_mut()
747 .filter(|(actor_id, _)| Some(*actor_id) != this_actor_id)
748 .map(|(actor_id, root)| {
749 let actor_id = actor_id.clone();
750 async move {
751 RealClock
752 .timeout(
753 timeout,
754 root.wait_for(|state: &ActorStatus| state.is_terminal()),
755 )
756 .await
757 .ok()
758 .map(|_| actor_id)
759 }
760 })
761 .collect();
762
763 let results = futures::future::join_all(waits).await;
764 let stopped_actors: Vec<_> = results
765 .iter()
766 .filter_map(|actor_id| actor_id.as_ref())
767 .cloned()
768 .collect();
769 let aborted_actors: Vec<_> = statuses
770 .iter()
771 .filter(|(actor_id, _)| !stopped_actors.contains(actor_id))
772 .map(|(actor_id, _)| {
773 let f = self.abort_root_actor(actor_id, this_handle);
774 async move {
775 let _ = if let Some(f) = f { Some(f.await) } else { None };
776 actor_id.clone()
783 }
784 })
785 .collect();
786 let aborted_actors = futures::future::join_all(aborted_actors).await;
787
788 if let Some(this_handle) = this_handle
789 && let Some(this_actor_id) = this_actor_id
790 && !except_current
791 {
792 tracing::debug!("{}: aborting (delayed) {:?}", this_actor_id, this_handle);
793 this_handle.abort()
794 };
795
796 tracing::info!(
797 "destroy_and_wait: {} actors stopped, {} actors aborted",
798 stopped_actors.len(),
799 aborted_actors.len()
800 );
801 Ok((stopped_actors, aborted_actors))
802 }
803
804 pub fn resolve_actor_ref<R: Actor + Referable>(
813 &self,
814 actor_ref: &ActorRef<R>,
815 ) -> Option<ActorHandle<R>> {
816 self.inner
817 .instances
818 .get(actor_ref.actor_id())?
819 .upgrade()?
820 .downcast_handle()
821 }
822
823 fn allocate_root_id(&self, name: &str) -> Result<ActorId, anyhow::Error> {
825 let name = name.to_string();
826 match self.state().roots.entry(name.to_string()) {
827 Entry::Vacant(entry) => {
828 entry.insert(AtomicUsize::new(1));
829 }
830 Entry::Occupied(_) => {
831 anyhow::bail!("an actor with name '{}' has already been spawned", name)
832 }
833 }
834 Ok(ActorId(self.state().proc_id.clone(), name.to_string(), 0))
835 }
836
837 #[hyperactor::instrument(fields(actor_name=parent_id.name()))]
839 pub(crate) fn allocate_child_id(&self, parent_id: &ActorId) -> Result<ActorId, anyhow::Error> {
840 assert_eq!(*parent_id.proc_id(), self.state().proc_id);
841 let pid = match self.state().roots.get(parent_id.name()) {
842 None => anyhow::bail!(
843 "no actor named {} in proc {}",
844 parent_id.name(),
845 self.state().proc_id
846 ),
847 Some(next_pid) => next_pid.fetch_add(1, Ordering::Relaxed),
848 };
849 Ok(parent_id.child_id(pid))
850 }
851
852 fn downgrade(&self) -> WeakProc {
853 WeakProc::new(self)
854 }
855}
856
857#[async_trait]
858impl MailboxSender for Proc {
859 fn post_unchecked(
860 &self,
861 envelope: MessageEnvelope,
862 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
863 ) {
864 if envelope.dest().actor_id().proc_id() == &self.state().proc_id {
865 self.state().proc_muxer.post(envelope, return_handle)
866 } else {
867 self.state().forwarder.post(envelope, return_handle)
868 }
869 }
870}
871
872#[derive(Debug)]
873struct WeakProc(Weak<ProcState>);
874
875impl WeakProc {
876 fn new(proc: &Proc) -> Self {
877 Self(Arc::downgrade(&proc.inner))
878 }
879
880 fn upgrade(&self) -> Option<Proc> {
881 self.0.upgrade().map(|inner| Proc { inner })
882 }
883}
884
885#[async_trait]
886impl MailboxSender for WeakProc {
887 fn post_unchecked(
888 &self,
889 envelope: MessageEnvelope,
890 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
891 ) {
892 match self.upgrade() {
893 Some(proc) => proc.post(envelope, return_handle),
894 None => envelope.undeliverable(
895 DeliveryError::BrokenLink("fail to upgrade WeakProc".to_string()),
896 return_handle,
897 ),
898 }
899 }
900}
901
902pub struct WorkCell<A: Actor + Send>(
905 Box<
906 dyn for<'a> FnOnce(
907 &'a mut A,
908 &'a Instance<A>,
909 )
910 -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
911 + Send
912 + Sync,
913 >,
914);
915
916impl<A: Actor + Send> WorkCell<A> {
917 fn new(
919 f: impl for<'a> FnOnce(
920 &'a mut A,
921 &'a Instance<A>,
922 )
923 -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
924 + Send
925 + Sync
926 + 'static,
927 ) -> Self {
928 Self(Box::new(f))
929 }
930
931 pub fn handle<'a>(
933 self,
934 actor: &'a mut A,
935 instance: &'a Instance<A>,
936 ) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'a>> {
937 (self.0)(actor, instance)
938 }
939}
940
941pub struct Context<'a, A: Actor> {
943 instance: &'a Instance<A>,
944 headers: Attrs,
945}
946
947impl<'a, A: Actor> Context<'a, A> {
948 pub fn new(instance: &'a Instance<A>, headers: Attrs) -> Self {
950 Self { instance, headers }
951 }
952
953 pub fn headers(&self) -> &Attrs {
955 &self.headers
956 }
957}
958
959impl<A: Actor> Deref for Context<'_, A> {
960 type Target = Instance<A>;
961
962 fn deref(&self) -> &Self::Target {
963 self.instance
964 }
965}
966
967pub struct Instance<A: Actor> {
971 inner: Arc<InstanceState<A>>,
972}
973
974impl<A: Actor> fmt::Debug for Instance<A> {
975 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
976 f.debug_struct("Instance").field("inner", &"..").finish()
977 }
978}
979
980struct InstanceState<A: Actor> {
981 proc: Proc,
983
984 cell: InstanceCell,
986
987 mailbox: Mailbox,
989
990 ports: Arc<Ports<A>>,
991
992 status_tx: watch::Sender<ActorStatus>,
994
995 id: Uuid,
997
998 sequencer: Sequencer,
1000}
1001
1002impl<A: Actor> InstanceState<A> {
1003 fn self_id(&self) -> &ActorId {
1004 self.mailbox.actor_id()
1005 }
1006}
1007
1008impl<A: Actor> Drop for InstanceState<A> {
1009 fn drop(&mut self) {
1010 self.status_tx.send_if_modified(|status| {
1011 if status.is_terminal() {
1012 false
1013 } else {
1014 tracing::info!(
1015 name = "ActorStatus",
1016 actor_id = %self.self_id(),
1017 actor_name = self.self_id().name(),
1018 status = "Stopped",
1019 prev_status = status.arm().unwrap_or("unknown"),
1020 "instance is dropped",
1021 );
1022 *status = ActorStatus::Stopped;
1023 true
1024 }
1025 });
1026 }
1027}
1028
1029impl<A: Actor> Instance<A> {
1030 fn new(
1032 proc: Proc,
1033 actor_id: ActorId,
1034 detached: bool,
1035 parent: Option<InstanceCell>,
1036 ) -> (
1037 Self,
1038 Option<(PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>)>,
1039 mpsc::UnboundedReceiver<WorkCell<A>>,
1040 ) {
1041 let mailbox = Mailbox::new(actor_id.clone(), BoxedMailboxSender::new(proc.downgrade()));
1043 let (work_tx, work_rx) = ordered_channel(
1044 actor_id.to_string(),
1045 hyperactor_config::global::get(config::ENABLE_CLIENT_SEQ_ASSIGNMENT),
1046 );
1047 let ports: Arc<Ports<A>> = Arc::new(Ports::new(mailbox.clone(), work_tx));
1048 proc.state().proc_muxer.bind_mailbox(mailbox.clone());
1049 let (status_tx, status_rx) = watch::channel(ActorStatus::Created);
1050
1051 let actor_type = match TypeInfo::of::<A>() {
1052 Some(info) => ActorType::Named(info),
1053 None => ActorType::Anonymous(std::any::type_name::<A>()),
1054 };
1055 let actor_loop_ports = if detached {
1056 None
1057 } else {
1058 let (signal_port, signal_receiver) = ports.open_message_port().unwrap();
1059 let (supervision_port, supervision_receiver) = mailbox.open_port();
1060 Some((
1061 (signal_port, supervision_port),
1062 (signal_receiver, supervision_receiver),
1063 ))
1064 };
1065
1066 let (actor_loop, actor_loop_receivers) = actor_loop_ports.unzip();
1067
1068 let cell = InstanceCell::new(
1069 actor_id,
1070 actor_type,
1071 proc.clone(),
1072 actor_loop,
1073 status_rx,
1074 parent,
1075 ports.clone(),
1076 );
1077 let instance_id = Uuid::now_v7();
1078 let inner = Arc::new(InstanceState {
1079 proc,
1080 cell,
1081 mailbox,
1082 ports,
1083 status_tx,
1084 sequencer: Sequencer::new(instance_id),
1085 id: instance_id,
1086 });
1087 (Self { inner }, actor_loop_receivers, work_rx)
1088 }
1089
1090 #[track_caller]
1093 fn change_status(&self, new: ActorStatus) {
1094 let old = self.inner.status_tx.send_replace(new.clone());
1095 assert!(
1101 !old.is_terminal() && !new.is_terminal()
1102 || !old.is_terminal() && new.is_terminal()
1103 || old == new,
1104 "actor changing status illegally, only allow non-terminal -> non-terminal \
1105 and non-terminal -> terminal statuses. actor_id={}, prev_status={}, status={}",
1106 self.self_id(),
1107 old,
1108 new
1109 );
1110 if !((old.is_idle() && new.is_processing())
1115 || (old.is_processing() && new.is_idle())
1116 || old == new)
1117 {
1118 let new_status = new.arm().unwrap_or("unknown");
1119 let change_reason = match new {
1120 ActorStatus::Failed(reason) => reason.to_string(),
1121 _ => "".to_string(),
1122 };
1123 tracing::info!(
1124 name = "ActorStatus",
1125 actor_id = %self.self_id(),
1126 actor_name = self.self_id().name(),
1127 status = new_status,
1128 prev_status = old.arm().unwrap_or("unknown"),
1129 caller = %Location::caller(),
1130 change_reason,
1131 );
1132 }
1133 }
1134
1135 fn is_terminal(&self) -> bool {
1136 self.inner.status_tx.borrow().is_terminal()
1137 }
1138
1139 fn is_stopping(&self) -> bool {
1140 self.inner.status_tx.borrow().is_stopping()
1141 }
1142
1143 pub fn self_id(&self) -> &ActorId {
1145 self.inner.self_id()
1146 }
1147
1148 pub fn stop(&self) -> Result<(), ActorError> {
1150 tracing::info!("Instance::stop called, {}", self.inner.cell.actor_id());
1151 self.inner.cell.signal(Signal::DrainAndStop)
1152 }
1153
1154 pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1159 self.inner.mailbox.open_port()
1160 }
1161
1162 pub fn open_once_port<M: Message>(&self) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1166 self.inner.mailbox.open_once_port()
1167 }
1168
1169 pub fn post(&self, port_id: PortId, headers: Attrs, message: wirevalue::Any) {
1171 <Self as context::MailboxExt>::post(self, port_id, headers, message, true)
1172 }
1173
1174 pub fn self_message_with_delay<M>(&self, message: M, delay: Duration) -> Result<(), ActorError>
1176 where
1177 M: Message,
1178 A: Handler<M>,
1179 {
1180 let port = self.port();
1181 let self_id = self.self_id().clone();
1182 let clock = self.inner.proc.state().clock.clone();
1183 tokio::spawn(async move {
1184 clock.non_advancing_sleep(delay).await;
1185 if let Err(e) = port.send(message) {
1186 tracing::info!("{}: error sending delayed message: {}", self_id, e);
1189 }
1190 });
1191 Ok(())
1192 }
1193
1194 fn start(
1197 self,
1198 actor: A,
1199 actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1200 work_rx: mpsc::UnboundedReceiver<WorkCell<A>>,
1201 ) -> ActorHandle<A> {
1202 let instance_cell = self.inner.cell.clone();
1203 let actor_id = self.inner.cell.actor_id().clone();
1204 let actor_handle = ActorHandle::new(self.inner.cell.clone(), self.inner.ports.clone());
1205 let actor_task_handle = A::spawn_server_task(
1206 panic_handler::with_backtrace_tracking(self.serve(
1207 actor,
1208 actor_loop_receivers,
1209 work_rx,
1210 ))
1211 .instrument(Span::current()),
1212 );
1213 tracing::debug!("{}: spawned with {:?}", actor_id, actor_task_handle);
1214 instance_cell
1215 .inner
1216 .actor_task_handle
1217 .set(actor_task_handle)
1218 .unwrap_or_else(|_| panic!("{}: task handle store failed", actor_id));
1219
1220 actor_handle
1221 }
1222
1223 async fn serve(
1224 mut self,
1225 mut actor: A,
1226 actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1227 mut work_rx: mpsc::UnboundedReceiver<WorkCell<A>>,
1228 ) {
1229 let result = self
1235 .run_actor_tree(&mut actor, actor_loop_receivers, &mut work_rx)
1236 .await;
1237
1238 assert!(self.is_stopping());
1239 let event = match result {
1240 Ok(_) => {
1241 self.change_status(ActorStatus::Stopped);
1243 None
1244 }
1245 Err(err) => {
1246 match *err.kind {
1247 ActorErrorKind::UnhandledSupervisionEvent(box event) => {
1248 assert!(event.actor_status.is_terminal());
1253 self.change_status(event.actor_status.clone());
1254 Some(event)
1255 }
1256 _ => {
1257 let error_kind = ActorErrorKind::Generic(err.kind.to_string());
1258 let status = ActorStatus::Failed(error_kind);
1259 self.change_status(status.clone());
1260 Some(ActorSupervisionEvent::new(
1261 self.inner.cell.actor_id().clone(),
1262 actor.display_name(),
1263 status,
1264 None,
1265 ))
1266 }
1267 }
1268 }
1269 };
1270
1271 if let Some(parent) = self.inner.cell.maybe_unlink_parent() {
1272 if let Some(event) = event {
1273 parent.send_supervision_event_or_crash(event);
1275 }
1276 if let Err(err) = parent.signal(Signal::ChildStopped(self.inner.cell.pid())) {
1279 tracing::error!(
1280 "{}: failed to send stop message to parent pid {}: {:?}",
1281 self.self_id(),
1282 parent.pid(),
1283 err
1284 );
1285 }
1286 } else {
1287 if let Some(event) = event {
1293 self.inner.proc.handle_supervision_event(event);
1294 }
1295 }
1296 }
1297
1298 async fn run_actor_tree(
1301 &mut self,
1302 actor: &mut A,
1303 mut actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1304 work_rx: &mut mpsc::UnboundedReceiver<WorkCell<A>>,
1305 ) -> Result<(), ActorError> {
1306 let mut did_panic = false;
1312 let result = match AssertUnwindSafe(self.run(actor, &mut actor_loop_receivers, work_rx))
1313 .catch_unwind()
1314 .await
1315 {
1316 Ok(result) => result,
1317 Err(_) => {
1318 did_panic = true;
1319 let panic_info = panic_handler::take_panic_info()
1320 .map(|info| info.to_string())
1321 .unwrap_or_else(|e| format!("Cannot take backtrace due to: {:?}", e));
1322 Err(ActorError::new(
1323 self.self_id(),
1324 ActorErrorKind::panic(anyhow::anyhow!(panic_info)),
1325 ))
1326 }
1327 };
1328
1329 assert!(!self.is_terminal());
1330 self.change_status(ActorStatus::Stopping);
1331 if let Err(err) = &result {
1332 tracing::error!("{}: actor failure: {}", self.self_id(), err);
1333 }
1334
1335 let mut to_unlink = Vec::new();
1338 for child in self.inner.cell.child_iter() {
1339 if let Err(err) = child.value().signal(Signal::Stop) {
1340 tracing::error!(
1341 "{}: failed to send stop signal to child pid {}: {:?}",
1342 self.self_id(),
1343 child.key(),
1344 err
1345 );
1346 to_unlink.push(child.value().clone());
1347 }
1348 }
1349 for child in to_unlink {
1351 self.inner.cell.unlink(&child);
1352 }
1353
1354 let (mut signal_receiver, _) = actor_loop_receivers;
1355 while self.inner.cell.child_count() > 0 {
1356 match RealClock
1357 .timeout(Duration::from_millis(500), signal_receiver.recv())
1358 .await
1359 {
1360 Ok(signal) => {
1361 if let Signal::ChildStopped(pid) = signal? {
1362 assert!(self.inner.cell.get_child(pid).is_none());
1363 }
1364 }
1365 Err(_) => {
1366 tracing::warn!(
1367 "timeout waiting for ChildStopped signal from child on actor: {}, ignoring",
1368 self.self_id()
1369 );
1370 self.inner.cell.unlink_all();
1373 break;
1374 }
1375 }
1376 }
1377 let cleanup_result = if !did_panic {
1383 let cleanup_timeout = hyperactor_config::global::get(config::CLEANUP_TIMEOUT);
1384 match RealClock
1385 .timeout(cleanup_timeout, actor.cleanup(self, result.as_ref().err()))
1386 .await
1387 {
1388 Ok(Ok(x)) => Ok(x),
1389 Ok(Err(e)) => Err(ActorError::new(self.self_id(), ActorErrorKind::cleanup(e))),
1390 Err(e) => Err(ActorError::new(
1391 self.self_id(),
1392 ActorErrorKind::cleanup(e.into()),
1393 )),
1394 }
1395 } else {
1396 Ok(())
1397 };
1398 if let Err(ref actor_err) = result {
1399 if let Err(ref err) = cleanup_result {
1402 tracing::warn!(
1403 cleanup_err = %err,
1404 %actor_err,
1405 "ignoring cleanup error after actor error",
1406 );
1407 }
1408 }
1409 result.and(cleanup_result)
1412 }
1413
1414 async fn run(
1416 &mut self,
1417 actor: &mut A,
1418 actor_loop_receivers: &mut (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1419 work_rx: &mut mpsc::UnboundedReceiver<WorkCell<A>>,
1420 ) -> Result<(), ActorError> {
1421 let (signal_receiver, supervision_event_receiver) = actor_loop_receivers;
1422
1423 self.change_status(ActorStatus::Initializing);
1424 actor
1425 .init(self)
1426 .await
1427 .map_err(|err| ActorError::new(self.self_id(), ActorErrorKind::init(err)))?;
1428 let need_drain;
1429 'messages: loop {
1430 self.change_status(ActorStatus::Idle);
1431 let metric_pairs =
1432 hyperactor_telemetry::kv_pairs!("actor_id" => self.self_id().to_string());
1433 tokio::select! {
1434 work = work_rx.recv() => {
1435 ACTOR_MESSAGES_RECEIVED.add(1, metric_pairs);
1436 ACTOR_MESSAGE_QUEUE_SIZE.add(-1, metric_pairs);
1437 let _ = ACTOR_MESSAGE_HANDLER_DURATION.start(metric_pairs);
1438 let work = work.expect("inconsistent work queue state");
1439 if let Err(err) = work.handle(actor, self).await {
1440 for supervision_event in supervision_event_receiver.drain() {
1441 self.handle_supervision_event(actor, supervision_event).await?;
1442 }
1443 let kind = ActorErrorKind::processing(err);
1444 return Err(ActorError {
1445 actor_id: Box::new(self.self_id().clone()),
1446 kind: Box::new(kind),
1447 });
1448 }
1449 }
1450 signal = signal_receiver.recv() => {
1451 let signal = signal.map_err(ActorError::from);
1452 tracing::debug!("Received signal {signal:?}");
1453 match signal? {
1454 signal@(Signal::Stop | Signal::DrainAndStop) => {
1455 need_drain = matches!(signal, Signal::DrainAndStop);
1456 break 'messages;
1457 },
1458 Signal::ChildStopped(pid) => {
1459 assert!(self.inner.cell.get_child(pid).is_none());
1460 },
1461 }
1462 }
1463 Ok(supervision_event) = supervision_event_receiver.recv() => {
1464 self.handle_supervision_event(actor, supervision_event).await?;
1465 }
1466 }
1467 self.inner
1468 .cell
1469 .inner
1470 .num_processed_messages
1471 .fetch_add(1, Ordering::SeqCst);
1472 }
1473
1474 if need_drain {
1475 let mut n = 0;
1476 while let Ok(work) = work_rx.try_recv() {
1477 if let Err(err) = work.handle(actor, self).await {
1478 return Err(ActorError::new(
1479 self.self_id(),
1480 ActorErrorKind::processing(err),
1481 ));
1482 }
1483 n += 1;
1484 }
1485 tracing::debug!("drained {} messages", n);
1486 }
1487 tracing::debug!("exited actor loop: {}", self.self_id());
1488 Ok(())
1489 }
1490
1491 pub async fn handle_supervision_event(
1493 &self,
1494 actor: &mut A,
1495 supervision_event: ActorSupervisionEvent,
1496 ) -> Result<(), ActorError> {
1497 match actor
1499 .handle_supervision_event(self, &supervision_event)
1500 .await
1501 {
1502 Ok(true) => {
1503 Ok(())
1505 }
1506 Ok(false) => {
1507 let kind = ActorErrorKind::UnhandledSupervisionEvent(Box::new(supervision_event));
1508 Err(ActorError::new(self.self_id(), kind))
1509 }
1510 Err(err) => {
1511 let kind = ActorErrorKind::ErrorDuringHandlingSupervision(
1514 err.to_string(),
1515 Box::new(supervision_event),
1516 );
1517 Err(ActorError::new(self.self_id(), kind))
1518 }
1519 }
1520 }
1521
1522 #[hyperactor::instrument(fields(actor_id = self.self_id().to_string(), actor_name = self.self_id().name()))]
1523 async unsafe fn handle_message<M: Message>(
1524 &self,
1525 actor: &mut A,
1526 type_info: Option<&'static TypeInfo>,
1527 headers: Attrs,
1528 message: M,
1529 ) -> Result<(), anyhow::Error>
1530 where
1531 A: Handler<M>,
1532 {
1533 let handler = type_info.map(|info| {
1534 (
1535 info.typename().to_string(),
1536 unsafe {
1538 info.arm_unchecked(&message as *const M as *const ())
1539 .map(str::to_string)
1540 },
1541 )
1542 });
1543
1544 self.change_status(ActorStatus::Processing(
1545 self.clock().system_time_now(),
1546 handler,
1547 ));
1548 crate::mailbox::headers::log_message_latency_if_sampling(
1549 &headers,
1550 self.self_id().to_string(),
1551 );
1552
1553 let context = Context::new(self, headers);
1554 actor.handle(&context, message).await
1558 }
1559
1560 pub fn spawn<C: Actor>(&self, actor: C) -> anyhow::Result<ActorHandle<C>> {
1562 self.inner.proc.spawn_child(self.inner.cell.clone(), actor)
1563 }
1564
1565 pub fn child(&self) -> anyhow::Result<(Instance<()>, ActorHandle<()>)> {
1567 self.inner.proc.child_instance(self.inner.cell.clone())
1568 }
1569
1570 pub fn port<M: Message>(&self) -> PortHandle<M>
1573 where
1574 A: Handler<M>,
1575 {
1576 self.inner.ports.get()
1577 }
1578
1579 pub fn handle(&self) -> ActorHandle<A> {
1581 ActorHandle::new(self.inner.cell.clone(), Arc::clone(&self.inner.ports))
1582 }
1583
1584 pub fn bind<R: Binds<A>>(&self) -> ActorRef<R> {
1586 self.inner.cell.bind(self.inner.ports.as_ref())
1587 }
1588
1589 #[doc(hidden)]
1591 pub fn mailbox_for_py(&self) -> &Mailbox {
1592 &self.inner.mailbox
1593 }
1594
1595 pub fn clock(&self) -> &(impl Clock + use<A>) {
1597 &self.inner.proc.state().clock
1598 }
1599
1600 pub fn proc(&self) -> &Proc {
1602 &self.inner.proc
1603 }
1604
1605 #[doc(hidden)]
1609 pub fn clone_for_py(&self) -> Self {
1610 Self {
1611 inner: Arc::clone(&self.inner),
1612 }
1613 }
1614
1615 fn actor_task_handle(&self) -> Option<&JoinHandle<()>> {
1617 self.inner.cell.inner.actor_task_handle.get()
1618 }
1619
1620 pub fn sequencer(&self) -> &Sequencer {
1622 &self.inner.sequencer
1623 }
1624
1625 pub fn instance_id(&self) -> Uuid {
1627 self.inner.id
1628 }
1629}
1630
1631impl<A: Actor> context::Mailbox for Instance<A> {
1632 fn mailbox(&self) -> &Mailbox {
1633 &self.inner.mailbox
1634 }
1635}
1636
1637impl<A: Actor> context::Mailbox for Context<'_, A> {
1638 fn mailbox(&self) -> &Mailbox {
1639 &self.instance.inner.mailbox
1640 }
1641}
1642
1643impl<A: Actor> context::Mailbox for &Instance<A> {
1644 fn mailbox(&self) -> &Mailbox {
1645 &self.inner.mailbox
1646 }
1647}
1648
1649impl<A: Actor> context::Mailbox for &Context<'_, A> {
1650 fn mailbox(&self) -> &Mailbox {
1651 &self.instance.inner.mailbox
1652 }
1653}
1654
1655impl<A: Actor> context::Actor for Instance<A> {
1656 type A = A;
1657 fn instance(&self) -> &Instance<A> {
1658 self
1659 }
1660}
1661
1662impl<A: Actor> context::Actor for Context<'_, A> {
1663 type A = A;
1664 fn instance(&self) -> &Instance<A> {
1665 self
1666 }
1667}
1668
1669impl<A: Actor> context::Actor for &Instance<A> {
1670 type A = A;
1671 fn instance(&self) -> &Instance<A> {
1672 self
1673 }
1674}
1675
1676impl<A: Actor> context::Actor for &Context<'_, A> {
1677 type A = A;
1678 fn instance(&self) -> &Instance<A> {
1679 self
1680 }
1681}
1682
1683impl Instance<()> {
1684 pub fn bind_actor_port<M: RemoteMessage>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1686 assert!(
1687 self.actor_task_handle().is_none(),
1688 "can only bind actor port on instance with no running actor task"
1689 );
1690 self.inner.mailbox.bind_actor_port()
1691 }
1692}
1693
1694#[derive(Debug)]
1695enum ActorType {
1696 Named(&'static TypeInfo),
1697 Anonymous(&'static str),
1698}
1699
1700impl ActorType {
1701 fn type_name(&self) -> &'static str {
1703 match self {
1704 Self::Named(info) => info.typename(),
1705 Self::Anonymous(name) => name,
1706 }
1707 }
1708}
1709
1710#[derive(Clone)]
1716pub struct InstanceCell {
1717 inner: Arc<InstanceCellState>,
1718}
1719
1720impl fmt::Debug for InstanceCell {
1721 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1722 f.debug_struct("InstanceCell")
1723 .field("actor_id", &self.inner.actor_id)
1724 .field("actor_type", &self.inner.actor_type)
1725 .finish()
1726 }
1727}
1728
1729struct InstanceCellState {
1730 actor_id: ActorId,
1732
1733 actor_type: ActorType,
1735
1736 proc: Proc,
1738
1739 actor_loop: Option<(PortHandle<Signal>, PortHandle<ActorSupervisionEvent>)>,
1741
1742 status: watch::Receiver<ActorStatus>,
1744
1745 parent: WeakInstanceCell,
1747
1748 children: DashMap<Index, InstanceCell>,
1750
1751 actor_task_handle: OnceLock<JoinHandle<()>>,
1753
1754 exported_named_ports: DashMap<u64, &'static str>,
1756
1757 num_processed_messages: AtomicU64,
1759
1760 recording: Recording,
1763
1764 ports: Arc<dyn Any + Send + Sync>,
1767}
1768
1769impl InstanceCellState {
1770 fn maybe_unlink_parent(&self) -> Option<InstanceCell> {
1773 self.parent
1774 .upgrade()
1775 .filter(|parent| parent.inner.unlink(self))
1776 }
1777
1778 fn unlink(&self, child: &InstanceCellState) -> bool {
1780 assert_eq!(self.actor_id.proc_id(), child.actor_id.proc_id());
1781 self.children.remove(&child.actor_id.pid()).is_some()
1782 }
1783}
1784
1785impl InstanceCell {
1786 fn new(
1789 actor_id: ActorId,
1790 actor_type: ActorType,
1791 proc: Proc,
1792 actor_loop: Option<(PortHandle<Signal>, PortHandle<ActorSupervisionEvent>)>,
1793 status: watch::Receiver<ActorStatus>,
1794 parent: Option<InstanceCell>,
1795 ports: Arc<dyn Any + Send + Sync>,
1796 ) -> Self {
1797 let _ais = actor_id.to_string();
1798 let cell = Self {
1799 inner: Arc::new(InstanceCellState {
1800 actor_id: actor_id.clone(),
1801 actor_type,
1802 proc: proc.clone(),
1803 actor_loop,
1804 status,
1805 parent: parent.map_or_else(WeakInstanceCell::new, |cell| cell.downgrade()),
1806 children: DashMap::new(),
1807 actor_task_handle: OnceLock::new(),
1808 exported_named_ports: DashMap::new(),
1809 num_processed_messages: AtomicU64::new(0),
1810 recording: hyperactor_telemetry::recorder().record(64),
1811 ports,
1812 }),
1813 };
1814 cell.maybe_link_parent();
1815 proc.inner
1816 .instances
1817 .insert(actor_id.clone(), cell.downgrade());
1818 cell
1819 }
1820
1821 fn wrap(inner: Arc<InstanceCellState>) -> Self {
1822 Self { inner }
1823 }
1824
1825 pub(crate) fn actor_id(&self) -> &ActorId {
1827 &self.inner.actor_id
1828 }
1829
1830 pub(crate) fn pid(&self) -> Index {
1832 self.inner.actor_id.pid()
1833 }
1834
1835 #[allow(dead_code)]
1837 pub(crate) fn actor_task_handle(&self) -> Option<&JoinHandle<()>> {
1838 self.inner.actor_task_handle.get()
1839 }
1840
1841 pub(crate) fn status(&self) -> &watch::Receiver<ActorStatus> {
1843 &self.inner.status
1844 }
1845
1846 pub fn signal(&self, signal: Signal) -> Result<(), ActorError> {
1848 if let Some((signal_port, _)) = &self.inner.actor_loop {
1849 signal_port.send(signal).map_err(ActorError::from)
1850 } else {
1851 tracing::warn!(
1852 "{}: attempted to send signal {} to detached actor",
1853 self.inner.actor_id,
1854 signal
1855 );
1856 Ok(())
1857 }
1858 }
1859
1860 pub fn send_supervision_event_or_crash(&self, event: ActorSupervisionEvent) {
1869 match &self.inner.actor_loop {
1870 Some((_, supervision_port)) => {
1871 if let Err(err) = supervision_port.send(event) {
1872 tracing::error!(
1873 "{}: failed to send supervision event to actor: {:?}. Crash the process.",
1874 self.actor_id(),
1875 err
1876 );
1877 std::process::exit(1);
1878 }
1879 }
1880 None => {
1881 tracing::error!(
1882 "{}: failed: {}: cannot send supervision event to detached actor: crashing",
1883 self.actor_id(),
1884 event,
1885 );
1886 std::process::exit(1);
1887 }
1888 }
1889 }
1890
1891 pub fn downgrade(&self) -> WeakInstanceCell {
1893 WeakInstanceCell {
1894 inner: Arc::downgrade(&self.inner),
1895 }
1896 }
1897
1898 fn link(&self, child: InstanceCell) {
1900 assert_eq!(self.actor_id().proc_id(), child.actor_id().proc_id());
1901 self.inner.children.insert(child.pid(), child);
1902 }
1903
1904 fn unlink(&self, child: &InstanceCell) {
1906 assert_eq!(self.actor_id().proc_id(), child.actor_id().proc_id());
1907 self.inner.children.remove(&child.pid());
1908 }
1909
1910 fn unlink_all(&self) {
1912 self.inner.children.clear();
1913 }
1914
1915 fn maybe_link_parent(&self) {
1917 if let Some(parent) = self.inner.parent.upgrade() {
1918 parent.link(self.clone());
1919 }
1920 }
1921
1922 fn maybe_unlink_parent(&self) -> Option<InstanceCell> {
1925 self.inner.maybe_unlink_parent()
1926 }
1927
1928 #[allow(dead_code)]
1930 fn get_parent_cell(&self) -> Option<InstanceCell> {
1931 self.inner.parent.upgrade()
1932 }
1933
1934 fn child_iter(&self) -> impl Iterator<Item = RefMulti<'_, Index, InstanceCell>> {
1937 self.inner.children.iter()
1938 }
1939
1940 fn child_count(&self) -> usize {
1942 self.inner.children.len()
1943 }
1944
1945 fn get_child(&self, pid: Index) -> Option<InstanceCell> {
1947 self.inner.children.get(&pid).map(|child| child.clone())
1948 }
1949
1950 pub(crate) fn bind<A: Actor, R: Binds<A>>(&self, ports: &Ports<A>) -> ActorRef<R> {
1953 <R as Binds<A>>::bind(ports);
1954 ports.bind::<Signal>();
1956 ports.bind::<Undeliverable<MessageEnvelope>>();
1957 for entry in ports.bound.iter() {
1959 self.inner
1960 .exported_named_ports
1961 .insert(*entry.key(), entry.value());
1962 }
1963 ActorRef::attest(self.actor_id().clone())
1964 }
1965
1966 pub(crate) fn downcast_handle<A: Actor>(&self) -> Option<ActorHandle<A>> {
1968 let ports = Arc::clone(&self.inner.ports).downcast::<Ports<A>>().ok()?;
1969 Some(ActorHandle::new(self.clone(), ports))
1970 }
1971}
1972
1973impl Drop for InstanceCellState {
1974 fn drop(&mut self) {
1975 if let Some(parent) = self.maybe_unlink_parent() {
1976 tracing::debug!(
1977 "instance {} was dropped with parent {} still linked",
1978 self.actor_id,
1979 parent.actor_id()
1980 );
1981 }
1982 if self.proc.inner.instances.remove(&self.actor_id).is_none() {
1983 tracing::error!("instance {} was dropped but not in proc", self.actor_id);
1984 }
1985 }
1986}
1987
1988#[derive(Debug, Clone)]
1991pub struct WeakInstanceCell {
1992 inner: Weak<InstanceCellState>,
1993}
1994
1995impl Default for WeakInstanceCell {
1996 fn default() -> Self {
1997 Self::new()
1998 }
1999}
2000
2001impl WeakInstanceCell {
2002 pub fn new() -> Self {
2004 Self { inner: Weak::new() }
2005 }
2006
2007 pub fn upgrade(&self) -> Option<InstanceCell> {
2009 self.inner.upgrade().map(InstanceCell::wrap)
2010 }
2011}
2012
2013pub struct Ports<A: Actor> {
2018 ports: DashMap<TypeId, Box<dyn Any + Send + Sync + 'static>>,
2019 bound: DashMap<u64, &'static str>,
2020 mailbox: Mailbox,
2021 workq: OrderedSender<WorkCell<A>>,
2022}
2023
2024#[derive(Serialize, Deserialize, Clone, typeuri::Named, AttrValue)]
2026pub struct SeqInfo {
2027 pub session_id: Uuid,
2029 pub seq: u64,
2031}
2032wirevalue::register_type!(SeqInfo);
2033
2034impl fmt::Display for SeqInfo {
2035 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2036 write!(f, "{}:{}", self.session_id, self.seq)
2037 }
2038}
2039
2040impl std::str::FromStr for SeqInfo {
2041 type Err = anyhow::Error;
2042
2043 fn from_str(s: &str) -> Result<Self, Self::Err> {
2044 let parts: Vec<_> = s.split(':').collect();
2045 if parts.len() != 2 {
2046 return Err(anyhow::anyhow!("invalid SeqInfo: {}", s));
2047 }
2048 let session_id: Uuid = parts[0].parse()?;
2049 let seq: u64 = parts[1].parse()?;
2050 Ok(SeqInfo { session_id, seq })
2051 }
2052}
2053
2054declare_attrs! {
2055 pub attr SEQ_INFO: SeqInfo;
2058}
2059
2060impl<A: Actor> Ports<A> {
2061 fn new(mailbox: Mailbox, workq: OrderedSender<WorkCell<A>>) -> Self {
2062 Self {
2063 ports: DashMap::new(),
2064 bound: DashMap::new(),
2065 mailbox,
2066 workq,
2067 }
2068 }
2069
2070 pub(crate) fn get<M: Message>(&self) -> PortHandle<M>
2072 where
2073 A: Handler<M>,
2074 {
2075 let key = TypeId::of::<M>();
2076 match self.ports.entry(key) {
2077 Entry::Vacant(entry) => {
2078 assert_ne!(
2080 key,
2081 TypeId::of::<Signal>(),
2082 "cannot provision Signal port through `Ports::get`"
2083 );
2084
2085 let type_info = TypeInfo::get_by_typeid(key);
2086 let workq = self.workq.clone();
2087 let actor_id = self.mailbox.actor_id().to_string();
2088 let port = self.mailbox.open_enqueue_port(move |headers, msg: M| {
2089 let seq_info = headers.get(SEQ_INFO).cloned();
2090
2091 let work = WorkCell::new(move |actor: &mut A, instance: &Instance<A>| {
2092 Box::pin(async move {
2093 unsafe {
2095 instance
2096 .handle_message(actor, type_info, headers, msg)
2097 .await
2098 }
2099 })
2100 });
2101 ACTOR_MESSAGE_QUEUE_SIZE.add(
2102 1,
2103 hyperactor_telemetry::kv_pairs!("actor_id" => actor_id.clone()),
2104 );
2105 if workq.enable_buffering {
2106 let SeqInfo { session_id, seq } =
2107 seq_info.expect("SEQ_INFO must be set when buffering is enabled");
2108
2109 workq.send(session_id, seq, work).map_err(|e| match e {
2112 OrderedSenderError::InvalidZeroSeq(_) => {
2113 anyhow::anyhow!("seq must be greater than 0")
2114 }
2115 OrderedSenderError::SendError(e) => anyhow::Error::from(e),
2116 OrderedSenderError::FlushError(e) => e,
2117 })
2118 } else {
2119 workq.direct_send(work).map_err(anyhow::Error::from)
2120 }
2121 });
2122 entry.insert(Box::new(port.clone()));
2123 port
2124 }
2125 Entry::Occupied(entry) => {
2126 let port = entry.get();
2127 port.downcast_ref::<PortHandle<M>>().unwrap().clone()
2128 }
2129 }
2130 }
2131
2132 pub(crate) fn open_message_port<M: Message>(&self) -> Option<(PortHandle<M>, PortReceiver<M>)> {
2135 match self.ports.entry(TypeId::of::<M>()) {
2136 Entry::Vacant(entry) => {
2137 let (port, receiver) = self.mailbox.open_port();
2138 entry.insert(Box::new(port.clone()));
2139 Some((port, receiver))
2140 }
2141 Entry::Occupied(_) => None,
2142 }
2143 }
2144
2145 pub fn bind<M: RemoteMessage>(&self)
2147 where
2148 A: Handler<M>,
2149 {
2150 let port_index = M::port();
2151 match self.bound.entry(port_index) {
2152 Entry::Vacant(entry) => {
2153 self.get::<M>().bind_actor_port();
2154 entry.insert(M::typename());
2155 }
2156 Entry::Occupied(entry) => {
2157 assert_eq!(
2158 *entry.get(),
2159 M::typename(),
2160 "bind {}: port index {} already bound to type {}",
2161 M::typename(),
2162 port_index,
2163 entry.get(),
2164 );
2165 }
2166 }
2167 }
2168}
2169
2170#[cfg(test)]
2171mod tests {
2172 use std::assert_matches::assert_matches;
2173 use std::sync::atomic::AtomicBool;
2174
2175 use hyperactor_macros::export;
2176 use maplit::hashmap;
2177 use serde_json::json;
2178 use timed_test::async_timed_test;
2179 use tokio::sync::Barrier;
2180 use tokio::sync::oneshot;
2181 use tracing::Level;
2182 use tracing_subscriber::layer::SubscriberExt;
2183 use tracing_test::internal::logs_with_scope_contain;
2184
2185 use super::*;
2186 use crate as hyperactor;
2188 use crate::HandleClient;
2189 use crate::Handler;
2190 use crate::OncePortRef;
2191 use crate::PortRef;
2192 use crate::clock::RealClock;
2193 use crate::test_utils::proc_supervison::ProcSupervisionCoordinator;
2194 use crate::test_utils::process_assertion::assert_termination;
2195
2196 impl ActorTreeSnapshot {
2197 #[allow(dead_code)]
2198 fn empty(pid: Index) -> Self {
2199 Self {
2200 pid,
2201 type_name: String::new(),
2202 status: ActorStatus::Idle,
2203 stats: ActorStats::default(),
2204 handlers: HashMap::new(),
2205 children: HashMap::new(),
2206 events: Vec::new(),
2207 spans: Vec::new(),
2208 }
2209 }
2210
2211 fn empty_typed(pid: Index, type_name: String) -> Self {
2212 Self {
2213 pid,
2214 type_name,
2215 status: ActorStatus::Idle,
2216 stats: ActorStats::default(),
2217 handlers: HashMap::new(),
2218 children: HashMap::new(),
2219 events: Vec::new(),
2220 spans: Vec::new(),
2221 }
2222 }
2223 }
2224
2225 #[derive(Debug, Default)]
2226 #[export]
2227 struct TestActor;
2228
2229 impl Actor for TestActor {}
2230
2231 #[derive(Handler, HandleClient, Debug)]
2232 enum TestActorMessage {
2233 Reply(oneshot::Sender<()>),
2234 Wait(oneshot::Sender<()>, oneshot::Receiver<()>),
2235 Forward(ActorHandle<TestActor>, Box<TestActorMessage>),
2236 Noop(),
2237 Fail(anyhow::Error),
2238 Panic(String),
2239 Spawn(oneshot::Sender<ActorHandle<TestActor>>),
2240 }
2241
2242 impl TestActor {
2243 async fn spawn_child(
2244 cx: &impl context::Actor,
2245 parent: &ActorHandle<TestActor>,
2246 ) -> ActorHandle<TestActor> {
2247 let (tx, rx) = oneshot::channel();
2248 parent.send(cx, TestActorMessage::Spawn(tx)).unwrap();
2249 rx.await.unwrap()
2250 }
2251 }
2252
2253 #[async_trait]
2254 #[crate::forward(TestActorMessage)]
2255 impl TestActorMessageHandler for TestActor {
2256 async fn reply(
2257 &mut self,
2258 _cx: &crate::Context<Self>,
2259 sender: oneshot::Sender<()>,
2260 ) -> Result<(), anyhow::Error> {
2261 sender.send(()).unwrap();
2262 Ok(())
2263 }
2264
2265 async fn wait(
2266 &mut self,
2267 _cx: &crate::Context<Self>,
2268 sender: oneshot::Sender<()>,
2269 receiver: oneshot::Receiver<()>,
2270 ) -> Result<(), anyhow::Error> {
2271 sender.send(()).unwrap();
2272 receiver.await.unwrap();
2273 Ok(())
2274 }
2275
2276 async fn forward(
2277 &mut self,
2278 cx: &crate::Context<Self>,
2279 destination: ActorHandle<TestActor>,
2280 message: Box<TestActorMessage>,
2281 ) -> Result<(), anyhow::Error> {
2282 destination.send(cx, *message)?;
2284 Ok(())
2285 }
2286
2287 async fn noop(&mut self, _cx: &crate::Context<Self>) -> Result<(), anyhow::Error> {
2288 Ok(())
2289 }
2290
2291 async fn fail(
2292 &mut self,
2293 _cx: &crate::Context<Self>,
2294 err: anyhow::Error,
2295 ) -> Result<(), anyhow::Error> {
2296 Err(err)
2297 }
2298
2299 async fn panic(
2300 &mut self,
2301 _cx: &crate::Context<Self>,
2302 err_msg: String,
2303 ) -> Result<(), anyhow::Error> {
2304 panic!("{}", err_msg);
2305 }
2306
2307 async fn spawn(
2308 &mut self,
2309 cx: &crate::Context<Self>,
2310 reply: oneshot::Sender<ActorHandle<TestActor>>,
2311 ) -> Result<(), anyhow::Error> {
2312 let handle = TestActor.spawn(cx)?;
2313 reply.send(handle).unwrap();
2314 Ok(())
2315 }
2316 }
2317
2318 #[tracing_test::traced_test]
2319 #[async_timed_test(timeout_secs = 30)]
2320 async fn test_spawn_actor() {
2321 let proc = Proc::local();
2322 let (client, _) = proc.instance("client").unwrap();
2323 let handle = proc.spawn("test", TestActor).unwrap();
2324
2325 assert!(logs_contain(
2327 format!(
2328 "{}: spawned with {:?}",
2329 handle.actor_id(),
2330 handle.cell().actor_task_handle().unwrap(),
2331 )
2332 .as_str()
2333 ));
2334
2335 let mut state = handle.status().clone();
2336
2337 let (tx, rx) = oneshot::channel::<()>();
2340 handle.send(&client, TestActorMessage::Reply(tx)).unwrap();
2341 rx.await.unwrap();
2342
2343 state
2344 .wait_for(|state: &ActorStatus| matches!(*state, ActorStatus::Idle))
2345 .await
2346 .unwrap();
2347
2348 let (enter_tx, enter_rx) = oneshot::channel::<()>();
2350 let (exit_tx, exit_rx) = oneshot::channel::<()>();
2351
2352 handle
2353 .send(&client, TestActorMessage::Wait(enter_tx, exit_rx))
2354 .unwrap();
2355 enter_rx.await.unwrap();
2356 assert_matches!(*state.borrow(), ActorStatus::Processing(instant, _) if instant <= RealClock.system_time_now());
2357 exit_tx.send(()).unwrap();
2358
2359 state
2360 .wait_for(|state| matches!(*state, ActorStatus::Idle))
2361 .await
2362 .unwrap();
2363
2364 handle.drain_and_stop().unwrap();
2365 handle.await;
2366 assert_matches!(*state.borrow(), ActorStatus::Stopped);
2367 }
2368
2369 #[async_timed_test(timeout_secs = 30)]
2370 async fn test_proc_actors_messaging() {
2371 let proc = Proc::local();
2372 let (client, _) = proc.instance("client").unwrap();
2373 let first = proc.spawn::<TestActor>("first", TestActor).unwrap();
2374 let second = proc.spawn::<TestActor>("second", TestActor).unwrap();
2375 let (tx, rx) = oneshot::channel::<()>();
2376 let reply_message = TestActorMessage::Reply(tx);
2377 first
2378 .send(
2379 &client,
2380 TestActorMessage::Forward(second, Box::new(reply_message)),
2381 )
2382 .unwrap();
2383 rx.await.unwrap();
2384 }
2385
2386 #[derive(Debug, Default)]
2387 #[export]
2388 struct LookupTestActor;
2389
2390 impl Actor for LookupTestActor {}
2391
2392 #[derive(Handler, HandleClient, Debug)]
2393 enum LookupTestMessage {
2394 ActorExists(ActorRef<TestActor>, #[reply] OncePortRef<bool>),
2395 }
2396
2397 #[async_trait]
2398 #[crate::forward(LookupTestMessage)]
2399 impl LookupTestMessageHandler for LookupTestActor {
2400 async fn actor_exists(
2401 &mut self,
2402 cx: &crate::Context<Self>,
2403 actor_ref: ActorRef<TestActor>,
2404 ) -> Result<bool, anyhow::Error> {
2405 Ok(actor_ref.downcast_handle(cx).is_some())
2406 }
2407 }
2408
2409 #[async_timed_test(timeout_secs = 30)]
2410 async fn test_actor_lookup() {
2411 let proc = Proc::local();
2412 let (client, _handle) = proc.instance("client").unwrap();
2413
2414 let target_actor = proc.spawn::<TestActor>("target", TestActor).unwrap();
2415 let target_actor_ref = target_actor.bind();
2416 let lookup_actor = proc
2417 .spawn::<LookupTestActor>("lookup", LookupTestActor)
2418 .unwrap();
2419
2420 assert!(
2421 lookup_actor
2422 .actor_exists(&client, target_actor_ref.clone())
2423 .await
2424 .unwrap()
2425 );
2426
2427 assert!(
2429 !lookup_actor
2430 .actor_exists(
2431 &client,
2432 ActorRef::attest(target_actor.actor_id().child_id(123).clone())
2433 )
2434 .await
2435 .unwrap()
2436 );
2437 assert!(
2439 !lookup_actor
2440 .actor_exists(&client, ActorRef::attest(lookup_actor.actor_id().clone()))
2441 .await
2442 .unwrap()
2443 );
2444
2445 target_actor.drain_and_stop().unwrap();
2446 target_actor.await;
2447
2448 assert!(
2449 !lookup_actor
2450 .actor_exists(&client, target_actor_ref)
2451 .await
2452 .unwrap()
2453 );
2454
2455 lookup_actor.drain_and_stop().unwrap();
2456 lookup_actor.await;
2457 }
2458
2459 fn validate_link(child: &InstanceCell, parent: &InstanceCell) {
2460 assert_eq!(child.actor_id().proc_id(), parent.actor_id().proc_id());
2461 assert_eq!(
2462 child.inner.parent.upgrade().unwrap().actor_id(),
2463 parent.actor_id()
2464 );
2465 assert_matches!(
2466 parent.inner.children.get(&child.pid()),
2467 Some(node) if node.actor_id() == child.actor_id()
2468 );
2469 }
2470
2471 #[tracing_test::traced_test]
2472 #[async_timed_test(timeout_secs = 30)]
2473 async fn test_spawn_child() {
2474 let proc = Proc::local();
2475 let (client, _) = proc.instance("client").unwrap();
2476
2477 let first = proc.spawn::<TestActor>("first", TestActor).unwrap();
2478 let second = TestActor::spawn_child(&client, &first).await;
2479 let third = TestActor::spawn_child(&client, &second).await;
2480
2481 assert!(logs_with_scope_contain(
2483 "hyperactor::proc",
2484 format!(
2485 "{}: spawned with {:?}",
2486 first.actor_id(),
2487 first.cell().actor_task_handle().unwrap()
2488 )
2489 .as_str()
2490 ));
2491 assert!(logs_with_scope_contain(
2492 "hyperactor::proc",
2493 format!(
2494 "{}: spawned with {:?}",
2495 second.actor_id(),
2496 second.cell().actor_task_handle().unwrap()
2497 )
2498 .as_str()
2499 ));
2500 assert!(logs_with_scope_contain(
2501 "hyperactor::proc",
2502 format!(
2503 "{}: spawned with {:?}",
2504 third.actor_id(),
2505 third.cell().actor_task_handle().unwrap()
2506 )
2507 .as_str()
2508 ));
2509
2510 assert_eq!(first.actor_id().proc_id(), proc.proc_id());
2512 assert_eq!(second.actor_id(), &first.actor_id().child_id(1));
2513 assert_eq!(third.actor_id(), &first.actor_id().child_id(2));
2514
2515 validate_link(third.cell(), second.cell());
2517 validate_link(second.cell(), first.cell());
2518 assert!(first.cell().inner.parent.upgrade().is_none());
2519
2520 let third_cell = third.cell().clone();
2523 third.drain_and_stop().unwrap();
2524 third.await;
2525 assert!(third_cell.inner.children.is_empty());
2526 drop(third_cell);
2527 validate_link(second.cell(), first.cell());
2528
2529 let second_cell = second.cell().clone();
2530 second.drain_and_stop().unwrap();
2531 second.await;
2532 assert!(second_cell.inner.children.is_empty());
2533 drop(second_cell);
2534
2535 let first_cell = first.cell().clone();
2536 first.drain_and_stop().unwrap();
2537 first.await;
2538 assert!(first_cell.inner.children.is_empty());
2539 }
2540
2541 #[async_timed_test(timeout_secs = 30)]
2542 async fn test_child_lifecycle() {
2543 let proc = Proc::local();
2544 let (client, _) = proc.instance("client").unwrap();
2545
2546 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
2547 let root_1 = TestActor::spawn_child(&client, &root).await;
2548 let root_2 = TestActor::spawn_child(&client, &root).await;
2549 let root_2_1 = TestActor::spawn_child(&client, &root_2).await;
2550
2551 root.drain_and_stop().unwrap();
2552 root.await;
2553
2554 for actor in [root_1, root_2, root_2_1] {
2555 assert!(actor.send(&client, TestActorMessage::Noop()).is_err());
2556 assert_matches!(actor.await, ActorStatus::Stopped);
2557 }
2558 }
2559
2560 #[async_timed_test(timeout_secs = 30)]
2561 async fn test_parent_failure() {
2562 let proc = Proc::local();
2563 let (client, _) = proc.instance("client").unwrap();
2564 ProcSupervisionCoordinator::set(&proc).await.unwrap();
2567
2568 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
2569 let root_1 = TestActor::spawn_child(&client, &root).await;
2570 let root_2 = TestActor::spawn_child(&client, &root).await;
2571 let root_2_1 = TestActor::spawn_child(&client, &root_2).await;
2572
2573 root_2
2574 .send(
2575 &client,
2576 TestActorMessage::Fail(anyhow::anyhow!("some random failure")),
2577 )
2578 .unwrap();
2579 let _root_2_actor_id = root_2.actor_id().clone();
2580 assert_matches!(
2581 root_2.await,
2582 ActorStatus::Failed(err) if err.to_string() == "some random failure"
2583 );
2584
2585 assert_matches!(
2589 root.await,
2590 ActorStatus::Failed(err) if err.to_string().contains("some random failure")
2591 );
2592 assert_eq!(root_2_1.await, ActorStatus::Stopped);
2593 assert_eq!(root_1.await, ActorStatus::Stopped);
2594 }
2595
2596 #[async_timed_test(timeout_secs = 30)]
2597 #[cfg_attr(not(fbcode_build), ignore)]
2600 async fn test_actor_ledger() {
2601 async fn wait_until_idle(actor_handle: &ActorHandle<TestActor>) {
2602 actor_handle
2603 .status()
2604 .wait_for(|state: &ActorStatus| matches!(*state, ActorStatus::Idle))
2605 .await
2606 .unwrap();
2607 }
2608
2609 let proc = Proc::local();
2610 let (client, _) = proc.instance("client").unwrap();
2611
2612 let root: ActorHandle<TestActor> = proc.spawn("root", TestActor).unwrap();
2614 wait_until_idle(&root).await;
2615 {
2616 let snapshot = proc.state().ledger.snapshot();
2617 assert_eq!(
2618 snapshot.roots,
2619 hashmap! {
2620 root.actor_id().clone() =>
2621 ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string())
2622 },
2623 );
2624 }
2625
2626 let another_root: ActorHandle<TestActor> = proc.spawn("another_root", TestActor).unwrap();
2628 wait_until_idle(&another_root).await;
2629 {
2630 let snapshot = proc.state().ledger.snapshot();
2631 assert_eq!(
2632 snapshot.roots,
2633 hashmap! {
2634 root.actor_id().clone() =>
2635 ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string()),
2636 another_root.actor_id().clone() =>
2637 ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string()),
2638 },
2639 );
2640 }
2641
2642 another_root.drain_and_stop().unwrap();
2645 another_root.await;
2646 {
2647 let snapshot = proc.state().ledger.snapshot();
2648 assert_eq!(
2649 snapshot.roots,
2650 hashmap! { root.actor_id().clone() =>
2651 ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string())
2652 },
2653 );
2654 }
2655
2656 let root_1 = TestActor::spawn_child(&client, &root).await;
2662 wait_until_idle(&root_1).await;
2663 wait_until_idle(&root).await;
2665 {
2666 let snapshot = proc.state().ledger.snapshot();
2667 assert_eq!(
2668 snapshot.roots,
2669 hashmap! {
2670 root.actor_id().clone() => ActorTreeSnapshot {
2671 pid: 0,
2672 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2673 status: ActorStatus::Idle,
2674 stats: ActorStats { num_processed_messages: 1 },
2675 handlers: HashMap::new(),
2676 children: hashmap! {
2677 root_1.actor_id().pid() =>
2678 ActorTreeSnapshot::empty_typed(
2679 root_1.actor_id().pid(),
2680 "hyperactor::proc::tests::TestActor".to_string()
2681 )
2682 },
2683 events: Vec::new(),
2684 spans: Vec::new(),
2685 }
2686 },
2687 );
2688 }
2689
2690 let root_1_1 = TestActor::spawn_child(&client, &root_1).await;
2691 wait_until_idle(&root_1_1).await;
2692 wait_until_idle(&root_1).await;
2693 {
2694 let snapshot = proc.state().ledger.snapshot();
2695 assert_eq!(
2696 snapshot.roots,
2697 hashmap! {
2698 root.actor_id().clone() => ActorTreeSnapshot {
2699 pid: 0,
2700 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2701 status: ActorStatus::Idle,
2702 stats: ActorStats { num_processed_messages: 1 },
2703 handlers: HashMap::new(),
2704 children: hashmap!{
2705 root_1.actor_id().pid() =>
2706 ActorTreeSnapshot {
2707 pid: root_1.actor_id().pid(),
2708 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2709 status: ActorStatus::Idle,
2710 stats: ActorStats { num_processed_messages: 1 },
2711 handlers: HashMap::new(),
2712 children: hashmap!{
2713 root_1_1.actor_id().pid() =>
2714 ActorTreeSnapshot::empty_typed(
2715 root_1_1.actor_id().pid(),
2716 "hyperactor::proc::tests::TestActor".to_string()
2717 )
2718 },
2719 events: Vec::new(),
2720 spans: Vec::new(),
2721 }
2722 },
2723 events: Vec::new(),
2724 spans: Vec::new(),
2725 },
2726 }
2727 );
2728 }
2729
2730 let root_2 = TestActor::spawn_child(&client, &root).await;
2731 wait_until_idle(&root_2).await;
2732 wait_until_idle(&root).await;
2733 {
2734 let snapshot = proc.state().ledger.snapshot();
2735 assert_eq!(
2736 snapshot.roots,
2737 hashmap! {
2738 root.actor_id().clone() => ActorTreeSnapshot {
2739 pid: 0,
2740 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2741 status: ActorStatus::Idle,
2742 stats: ActorStats { num_processed_messages: 2 },
2743 handlers: HashMap::new(),
2744 children: hashmap!{
2745 root_2.actor_id().pid() =>
2746 ActorTreeSnapshot{
2747 pid: root_2.actor_id().pid(),
2748 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2749 status: ActorStatus::Idle,
2750 stats: ActorStats::default(),
2751 handlers: HashMap::new(),
2752 children: HashMap::new(),
2753 events: Vec::new(),
2754 spans: Vec::new(),
2755 },
2756 root_1.actor_id().pid() =>
2757 ActorTreeSnapshot{
2758 pid: root_1.actor_id().pid(),
2759 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2760 status: ActorStatus::Idle,
2761 stats: ActorStats { num_processed_messages: 1 },
2762 handlers: HashMap::new(),
2763 children: hashmap!{
2764 root_1_1.actor_id().pid() =>
2765 ActorTreeSnapshot::empty_typed(
2766 root_1_1.actor_id().pid(),
2767 "hyperactor::proc::tests::TestActor".to_string()
2768 )
2769 },
2770 events: Vec::new(),
2771 spans: Vec::new(),
2772 },
2773 },
2774 events: Vec::new(),
2775 spans: Vec::new(),
2776 },
2777 }
2778 );
2779 }
2780
2781 root_1.drain_and_stop().unwrap();
2783 root_1.await;
2784 wait_until_idle(&root).await;
2786 {
2787 let snapshot = proc.state().ledger.snapshot();
2788 assert_eq!(
2789 snapshot.roots,
2790 hashmap! {
2791 root.actor_id().clone() => ActorTreeSnapshot {
2792 pid: 0,
2793 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2794 status: ActorStatus::Idle,
2795 stats: ActorStats { num_processed_messages: 3 },
2796 handlers: HashMap::new(),
2797 children: hashmap!{
2798 root_2.actor_id().pid() =>
2799 ActorTreeSnapshot {
2800 pid: root_2.actor_id().pid(),
2801 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2802 status: ActorStatus::Idle,
2803 stats: ActorStats::default(),
2804 handlers: HashMap::new(),
2805 children: HashMap::new(),
2806 events: Vec::new(),
2807 spans: Vec::new(),
2808 }
2809 },
2810 events: Vec::new(),
2811 spans: Vec::new(),
2812 },
2813 }
2814 );
2815 }
2816
2817 root.drain_and_stop().unwrap();
2819 root.await;
2820 {
2821 let snapshot = proc.state().ledger.snapshot();
2822 assert_eq!(snapshot.roots, hashmap! {});
2823 }
2824 }
2825
2826 #[async_timed_test(timeout_secs = 30)]
2827 async fn test_multi_handler() {
2828 #[derive(Debug)]
2832 struct TestActor(Arc<AtomicUsize>);
2833
2834 #[async_trait]
2835 impl Actor for TestActor {}
2836
2837 #[async_trait]
2838 impl Handler<OncePortHandle<PortHandle<usize>>> for TestActor {
2839 async fn handle(
2840 &mut self,
2841 cx: &crate::Context<Self>,
2842 message: OncePortHandle<PortHandle<usize>>,
2843 ) -> anyhow::Result<()> {
2844 message.send(cx.port())?;
2845 Ok(())
2846 }
2847 }
2848
2849 #[async_trait]
2850 impl Handler<usize> for TestActor {
2851 async fn handle(
2852 &mut self,
2853 _cx: &crate::Context<Self>,
2854 message: usize,
2855 ) -> anyhow::Result<()> {
2856 self.0.fetch_add(message, Ordering::SeqCst);
2857 Ok(())
2858 }
2859 }
2860
2861 let proc = Proc::local();
2862 let state = Arc::new(AtomicUsize::new(0));
2863 let actor = TestActor(state.clone());
2864 let handle = proc.spawn::<TestActor>("test", actor).unwrap();
2865 let (client, _) = proc.instance("client").unwrap();
2866 let (tx, rx) = client.open_once_port();
2867 handle.send(&client, tx).unwrap();
2868 let usize_handle = rx.recv().await.unwrap();
2869 usize_handle.send(123).unwrap();
2870
2871 handle.drain_and_stop().unwrap();
2872 handle.await;
2873
2874 assert_eq!(state.load(Ordering::SeqCst), 123);
2875 }
2876
2877 #[async_timed_test(timeout_secs = 30)]
2878 async fn test_actor_panic() {
2879 panic_handler::set_panic_hook();
2881
2882 let proc = Proc::local();
2883 ProcSupervisionCoordinator::set(&proc).await.unwrap();
2886
2887 let (client, _handle) = proc.instance("client").unwrap();
2888 let actor_handle = proc.spawn("test", TestActor).unwrap();
2889 actor_handle
2890 .panic(&client, "some random failure".to_string())
2891 .await
2892 .unwrap();
2893 let actor_status = actor_handle.await;
2894
2895 assert_matches!(actor_status, ActorStatus::Failed(_));
2899 if let ActorStatus::Failed(err) = actor_status {
2900 let error_msg = err.to_string();
2901 assert!(error_msg.contains("some random failure"));
2903 assert!(error_msg.contains("rust_begin_unwind"));
2907 }
2908 }
2909
2910 #[async_timed_test(timeout_secs = 30)]
2911 async fn test_local_supervision_propagation() {
2912 hyperactor_telemetry::initialize_logging_for_test();
2913
2914 #[derive(Debug)]
2915 struct TestActor(Arc<AtomicBool>, bool);
2916
2917 #[async_trait]
2918 impl Actor for TestActor {
2919 async fn handle_supervision_event(
2920 &mut self,
2921 _this: &Instance<Self>,
2922 _event: &ActorSupervisionEvent,
2923 ) -> Result<bool, anyhow::Error> {
2924 if !self.1 {
2925 return Ok(false);
2926 }
2927
2928 tracing::error!(
2929 "{}: supervision event received: {:?}",
2930 _this.self_id(),
2931 _event
2932 );
2933 self.0.store(true, Ordering::SeqCst);
2934 Ok(true)
2935 }
2936 }
2937
2938 #[async_trait]
2939 impl Handler<String> for TestActor {
2940 async fn handle(
2941 &mut self,
2942 cx: &crate::Context<Self>,
2943 message: String,
2944 ) -> anyhow::Result<()> {
2945 tracing::info!("{} received message: {}", cx.self_id(), message);
2946 Err(anyhow::anyhow!(message))
2947 }
2948 }
2949
2950 let proc = Proc::local();
2951 let (client, _) = proc.instance("client").unwrap();
2952 let reported_event = ProcSupervisionCoordinator::set(&proc).await.unwrap();
2953
2954 let root_state = Arc::new(AtomicBool::new(false));
2955 let root_1_state = Arc::new(AtomicBool::new(false));
2956 let root_1_1_state = Arc::new(AtomicBool::new(false));
2957 let root_1_1_1_state = Arc::new(AtomicBool::new(false));
2958 let root_2_state = Arc::new(AtomicBool::new(false));
2959 let root_2_1_state = Arc::new(AtomicBool::new(false));
2960
2961 let root = proc
2962 .spawn::<TestActor>("root", TestActor(root_state.clone(), false))
2963 .unwrap();
2964 let root_1 = proc
2965 .spawn_child::<TestActor>(
2966 root.cell().clone(),
2967 TestActor(
2968 root_1_state.clone(),
2969 true, ),
2971 )
2972 .unwrap();
2973 let root_1_1 = proc
2974 .spawn_child::<TestActor>(
2975 root_1.cell().clone(),
2976 TestActor(root_1_1_state.clone(), false),
2977 )
2978 .unwrap();
2979 let root_1_1_1 = proc
2980 .spawn_child::<TestActor>(
2981 root_1_1.cell().clone(),
2982 TestActor(root_1_1_1_state.clone(), false),
2983 )
2984 .unwrap();
2985 let root_2 = proc
2986 .spawn_child::<TestActor>(root.cell().clone(), TestActor(root_2_state.clone(), false))
2987 .unwrap();
2988 let root_2_1 = proc
2989 .spawn_child::<TestActor>(
2990 root_2.cell().clone(),
2991 TestActor(root_2_1_state.clone(), false),
2992 )
2993 .unwrap();
2994
2995 root_1_1_1
2998 .send::<String>(&client, "some random failure".into())
2999 .unwrap();
3000
3001 root_2_1
3004 .send::<String>(&client, "some random failure".into())
3005 .unwrap();
3006
3007 RealClock.sleep(Duration::from_secs(1)).await;
3008
3009 assert!(!root_state.load(Ordering::SeqCst));
3010 assert!(root_1_state.load(Ordering::SeqCst));
3011 assert!(!root_1_1_state.load(Ordering::SeqCst));
3012 assert!(!root_1_1_1_state.load(Ordering::SeqCst));
3013 assert!(!root_2_state.load(Ordering::SeqCst));
3014 assert!(!root_2_1_state.load(Ordering::SeqCst));
3015 assert_eq!(
3016 reported_event.event().map(|e| e.actor_id.clone()),
3017 Some(root_2_1.actor_id().clone())
3018 );
3019 }
3020
3021 #[async_timed_test(timeout_secs = 30)]
3022 async fn test_instance() {
3023 #[derive(Debug, Default)]
3024 struct TestActor;
3025
3026 impl Actor for TestActor {}
3027
3028 #[async_trait]
3029 impl Handler<(String, PortRef<String>)> for TestActor {
3030 async fn handle(
3031 &mut self,
3032 cx: &crate::Context<Self>,
3033 (message, port): (String, PortRef<String>),
3034 ) -> anyhow::Result<()> {
3035 port.send(cx, message)?;
3036 Ok(())
3037 }
3038 }
3039
3040 let proc = Proc::local();
3041
3042 let (instance, handle) = proc.instance("my_test_actor").unwrap();
3043
3044 let child_actor = TestActor.spawn(&instance).unwrap();
3045
3046 let (port, mut receiver) = instance.open_port();
3047 child_actor
3048 .send(&instance, ("hello".to_string(), port.bind()))
3049 .unwrap();
3050
3051 let message = receiver.recv().await.unwrap();
3052 assert_eq!(message, "hello");
3053
3054 child_actor.drain_and_stop().unwrap();
3055 child_actor.await;
3056
3057 assert_eq!(*handle.status().borrow(), ActorStatus::Client);
3058 drop(instance);
3059 assert_eq!(*handle.status().borrow(), ActorStatus::Stopped);
3060 handle.await;
3061 }
3062
3063 #[tokio::test]
3064 async fn test_proc_terminate_without_coordinator() {
3065 if std::env::var("CARGO_TEST").is_ok() {
3066 eprintln!("test skipped as it hangs when run by cargo in sandcastle");
3067 return;
3068 }
3069
3070 let process = async {
3071 let proc = Proc::local();
3072 let root = proc.spawn("root", TestActor).unwrap();
3076 let (client, _handle) = proc.instance("client").unwrap();
3077 root.fail(&client, anyhow::anyhow!("some random failure"))
3078 .await
3079 .unwrap();
3080 RealClock.sleep(Duration::from_secs(30)).await;
3084 };
3085
3086 assert_termination(|| process, 1).await.unwrap();
3087 }
3088
3089 fn trace_and_block(fut: impl Future) {
3090 tracing::subscriber::with_default(
3091 tracing_subscriber::Registry::default().with(hyperactor_telemetry::recorder().layer()),
3092 || {
3093 tokio::runtime::Builder::new_current_thread()
3094 .enable_all()
3095 .build()
3096 .unwrap()
3097 .block_on(fut)
3098 },
3099 );
3100 }
3101
3102 #[ignore = "until trace recording is turned back on"]
3103 #[test]
3104 fn test_handler_logging() {
3105 #[derive(Debug, Default)]
3106 struct LoggingActor;
3107
3108 impl Actor for LoggingActor {}
3109
3110 impl LoggingActor {
3111 async fn wait(cx: &impl context::Actor, handle: &ActorHandle<Self>) {
3112 let barrier = Arc::new(Barrier::new(2));
3113 handle.send(cx, barrier.clone()).unwrap();
3114 barrier.wait().await;
3115 }
3116 }
3117
3118 #[async_trait]
3119 impl Handler<String> for LoggingActor {
3120 async fn handle(
3121 &mut self,
3122 _cx: &crate::Context<Self>,
3123 message: String,
3124 ) -> anyhow::Result<()> {
3125 tracing::info!("{}", message);
3126 Ok(())
3127 }
3128 }
3129
3130 #[async_trait]
3131 impl Handler<u64> for LoggingActor {
3132 async fn handle(
3133 &mut self,
3134 _cx: &crate::Context<Self>,
3135 message: u64,
3136 ) -> anyhow::Result<()> {
3137 tracing::event!(Level::INFO, number = message);
3138 Ok(())
3139 }
3140 }
3141
3142 #[async_trait]
3143 impl Handler<Arc<Barrier>> for LoggingActor {
3144 async fn handle(
3145 &mut self,
3146 _cx: &crate::Context<Self>,
3147 message: Arc<Barrier>,
3148 ) -> anyhow::Result<()> {
3149 message.wait().await;
3150 Ok(())
3151 }
3152 }
3153
3154 #[async_trait]
3155 impl Handler<Arc<(Barrier, Barrier)>> for LoggingActor {
3156 async fn handle(
3157 &mut self,
3158 _cx: &crate::Context<Self>,
3159 barriers: Arc<(Barrier, Barrier)>,
3160 ) -> anyhow::Result<()> {
3161 let inner = tracing::span!(Level::INFO, "child_span");
3162 let _inner_guard = inner.enter();
3163 barriers.0.wait().await;
3164 barriers.1.wait().await;
3165 Ok(())
3166 }
3167 }
3168
3169 trace_and_block(async {
3170 let proc = Proc::local();
3171 let (client, _) = proc.instance("client").unwrap();
3172 let handle = LoggingActor.spawn_detached().unwrap();
3173 handle.send(&client, "hello world".to_string()).unwrap();
3174 handle
3175 .send(&client, "hello world again".to_string())
3176 .unwrap();
3177 handle.send(&client, 123u64).unwrap();
3178
3179 LoggingActor::wait(&client, &handle).await;
3180
3181 let events = handle.cell().inner.recording.tail();
3182 assert_eq!(events.len(), 3);
3183 assert_eq!(events[0].json_value(), json!({ "message": "hello world" }));
3184 assert_eq!(
3185 events[1].json_value(),
3186 json!({ "message": "hello world again" })
3187 );
3188 assert_eq!(events[2].json_value(), json!({ "number": 123 }));
3189
3190 let stacks = {
3191 let barriers = Arc::new((Barrier::new(2), Barrier::new(2)));
3192 handle.send(&client, Arc::clone(&barriers)).unwrap();
3193 barriers.0.wait().await;
3194 let stacks = handle.cell().inner.recording.stacks();
3195 barriers.1.wait().await;
3196 stacks
3197 };
3198 assert_eq!(stacks.len(), 1);
3199 assert_eq!(stacks[0].len(), 1);
3200 assert_eq!(stacks[0][0].name(), "child_span");
3201 })
3202 }
3203}