1use std::any::Any;
15use std::any::TypeId;
16use std::collections::HashMap;
17use std::fmt;
18use std::future::Future;
19use std::hash::Hash;
20use std::hash::Hasher;
21use std::ops::Deref;
22use std::panic;
23use std::panic::AssertUnwindSafe;
24use std::pin::Pin;
25use std::sync::Arc;
26use std::sync::OnceLock;
27use std::sync::Weak;
28use std::sync::atomic::AtomicU64;
29use std::sync::atomic::AtomicUsize;
30use std::sync::atomic::Ordering;
31use std::time::Duration;
32use std::time::SystemTime;
33
34use async_trait::async_trait;
35use dashmap::DashMap;
36use dashmap::mapref::entry::Entry;
37use dashmap::mapref::multiple::RefMulti;
38use futures::FutureExt;
39use hyperactor_macros::AttrValue;
40use hyperactor_macros::Named;
41use hyperactor_telemetry::recorder;
42use hyperactor_telemetry::recorder::Recording;
43use serde::Deserialize;
44use serde::Serialize;
45use tokio::sync::mpsc;
46use tokio::sync::watch;
47use tokio::task::JoinHandle;
48use tracing::Instrument;
49use uuid::Uuid;
50
51use crate as hyperactor;
52use crate::Actor;
53use crate::ActorRef;
54use crate::Handler;
55use crate::Message;
56use crate::RemoteMessage;
57use crate::actor::ActorError;
58use crate::actor::ActorErrorKind;
59use crate::actor::ActorHandle;
60use crate::actor::ActorStatus;
61use crate::actor::Binds;
62use crate::actor::Referable;
63use crate::actor::RemoteHandles;
64use crate::actor::Signal;
65use crate::attrs::Attrs;
66use crate::channel;
67use crate::channel::ChannelAddr;
68use crate::channel::ChannelError;
69use crate::clock::Clock;
70use crate::clock::ClockKind;
71use crate::clock::RealClock;
72use crate::config;
73use crate::context;
74use crate::data::Serialized;
75use crate::data::TypeInfo;
76use crate::declare_attrs;
77use crate::mailbox::BoxedMailboxSender;
78use crate::mailbox::DeliveryError;
79use crate::mailbox::DialMailboxRouter;
80use crate::mailbox::IntoBoxedMailboxSender as _;
81use crate::mailbox::Mailbox;
82use crate::mailbox::MailboxMuxer;
83use crate::mailbox::MailboxSender;
84use crate::mailbox::MailboxServer as _;
85use crate::mailbox::MessageEnvelope;
86use crate::mailbox::OncePortHandle;
87use crate::mailbox::OncePortReceiver;
88use crate::mailbox::PanickingMailboxSender;
89use crate::mailbox::PortHandle;
90use crate::mailbox::PortReceiver;
91use crate::mailbox::Undeliverable;
92use crate::metrics::ACTOR_MESSAGE_HANDLER_DURATION;
93use crate::metrics::ACTOR_MESSAGE_QUEUE_SIZE;
94use crate::metrics::ACTOR_MESSAGES_RECEIVED;
95use crate::ordering::OrderedSender;
96use crate::ordering::OrderedSenderError;
97use crate::ordering::Sequencer;
98use crate::ordering::ordered_channel;
99use crate::panic_handler;
100use crate::reference::ActorId;
101use crate::reference::Index;
102use crate::reference::PortId;
103use crate::reference::ProcId;
104use crate::reference::id;
105use crate::supervision::ActorSupervisionEvent;
106
107static NEXT_LOCAL_RANK: AtomicUsize = AtomicUsize::new(0);
109
110#[derive(Clone, Debug)]
117pub struct Proc {
118 inner: Arc<ProcState>,
119}
120
121#[derive(Debug)]
122struct ProcState {
123 proc_id: ProcId,
126
127 proc_muxer: MailboxMuxer,
130
131 forwarder: BoxedMailboxSender,
133
134 roots: DashMap<String, AtomicUsize>,
138
139 ledger: ActorLedger,
141
142 instances: DashMap<ActorId, WeakInstanceCell>,
143
144 supervision_coordinator_port: OnceLock<PortHandle<ActorSupervisionEvent>>,
147
148 clock: ClockKind,
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
153pub struct ActorLedgerSnapshot {
154 pub roots: HashMap<ActorId, ActorTreeSnapshot>,
157}
158
159#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
161pub struct Event {
162 pub time: SystemTime,
164 pub fields: Vec<(String, recorder::Value)>,
166 pub seq: usize,
168}
169
170impl From<recorder::Event> for Event {
171 fn from(event: recorder::Event) -> Event {
172 Event {
173 time: event.time,
174 fields: event.fields(),
175 seq: event.seq,
176 }
177 }
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
182pub struct ActorTreeSnapshot {
183 pub pid: Index,
185
186 pub type_name: String,
190
191 pub status: ActorStatus,
193
194 pub stats: ActorStats,
196
197 pub handlers: HashMap<u64, String>,
199
200 pub children: HashMap<Index, ActorTreeSnapshot>,
202
203 pub events: Vec<Event>,
205
206 pub spans: Vec<Vec<String>>,
209}
210
211impl Hash for ActorTreeSnapshot {
212 fn hash<H: Hasher>(&self, state: &mut H) {
213 self.pid.hash(state);
214 }
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
219#[derive(Default)]
220pub struct ActorStats {
221 num_processed_messages: u64,
223}
224
225impl fmt::Display for ActorStats {
226 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
227 write!(f, "num_processed_messages={}", self.num_processed_messages)
228 }
229}
230
231#[derive(Debug)]
232struct ActorLedger {
233 roots: DashMap<ActorId, WeakInstanceCell>,
235}
236
237impl ActorLedger {
238 fn new() -> Self {
239 Self {
240 roots: DashMap::new(),
241 }
242 }
243
244 fn insert(
245 &self,
246 root_actor_id: ActorId,
247 root_actor_cell: WeakInstanceCell,
248 ) -> Result<(), anyhow::Error> {
249 match self.roots.insert(root_actor_id.clone(), root_actor_cell) {
250 None => Ok(()),
251 Some(current_cell) => {
254 let debugging_msg = match current_cell.upgrade() {
255 Some(cell) => format!("the stored cell's actor ID is {}", cell.actor_id()),
256 None => "the stored cell has been dropped".to_string(),
257 };
258
259 Err(anyhow::anyhow!(
260 "actor '{root_actor_id}' has already been added to ledger: {debugging_msg}"
261 ))
262 }
263 }
264 }
265
266 fn snapshot(&self) -> ActorLedgerSnapshot {
268 let roots = self
269 .roots
270 .iter()
271 .flat_map(|r| {
272 let (actor_id, weak_cell) = r.pair();
273 weak_cell
277 .upgrade()
278 .map(|cell| (actor_id.clone(), Self::get_actor_tree_snapshot(&cell)))
279 })
280 .collect();
281
282 ActorLedgerSnapshot { roots }
283 }
284
285 fn get_actor_tree_snapshot(cell: &InstanceCell) -> ActorTreeSnapshot {
286 let children = cell
288 .child_iter()
289 .map(|child| (child.pid(), Self::get_actor_tree_snapshot(child.value())))
290 .collect();
291
292 ActorTreeSnapshot {
293 pid: cell.actor_id().pid(),
294 type_name: cell.inner.actor_type.type_name().to_string(),
295 status: cell.status().borrow().clone(),
296 stats: ActorStats {
297 num_processed_messages: cell.inner.num_processed_messages.load(Ordering::SeqCst),
298 },
299 handlers: cell
300 .inner
301 .exported_named_ports
302 .iter()
303 .map(|entry| (*entry.key(), entry.value().to_string()))
304 .collect(),
305 children,
306 events: cell
307 .inner
308 .recording
309 .tail()
310 .into_iter()
311 .map(Event::from)
312 .collect(),
313 spans: cell
314 .inner
315 .recording
316 .stacks()
317 .into_iter()
318 .map(|stack| {
319 stack
320 .into_iter()
321 .map(|meta| meta.name().to_string())
322 .collect()
323 })
324 .collect(),
325 }
326 }
327}
328
329impl Proc {
330 pub fn new(proc_id: ProcId, forwarder: BoxedMailboxSender) -> Self {
332 Self::new_with_clock(proc_id, forwarder, ClockKind::default())
333 }
334
335 pub async fn direct(addr: ChannelAddr, name: String) -> Result<Self, ChannelError> {
337 let (addr, rx) = channel::serve(addr)?;
338 let proc_id = ProcId::Direct(addr, name);
339 let proc = Self::new(proc_id, DialMailboxRouter::new().into_boxed());
340 proc.clone().serve(rx);
341 Ok(proc)
342 }
343
344 pub fn direct_with_default(
346 addr: ChannelAddr,
347 name: String,
348 default: BoxedMailboxSender,
349 ) -> Result<Self, ChannelError> {
350 let (addr, rx) = channel::serve(addr)?;
351 let proc_id = ProcId::Direct(addr, name);
352 let proc = Self::new(
353 proc_id,
354 DialMailboxRouter::new_with_default(default).into_boxed(),
355 );
356 proc.clone().serve(rx);
357 Ok(proc)
358 }
359
360 pub fn new_with_clock(
362 proc_id: ProcId,
363 forwarder: BoxedMailboxSender,
364 clock: ClockKind,
365 ) -> Self {
366 Self {
367 inner: Arc::new(ProcState {
368 proc_id,
369 proc_muxer: MailboxMuxer::new(),
370 forwarder,
371 roots: DashMap::new(),
372 ledger: ActorLedger::new(),
373 instances: DashMap::new(),
374 supervision_coordinator_port: OnceLock::new(),
375 clock,
376 }),
377 }
378 }
379
380 pub fn set_supervision_coordinator(
383 &self,
384 port: PortHandle<ActorSupervisionEvent>,
385 ) -> Result<(), anyhow::Error> {
386 self.state()
387 .supervision_coordinator_port
388 .set(port)
389 .map_err(|existing| anyhow::anyhow!("coordinator port is already set to {existing}"))
390 }
391
392 fn handle_supervision_event(&self, event: ActorSupervisionEvent) {
393 let result = match self.state().supervision_coordinator_port.get() {
394 Some(port) => port.send(event).map_err(anyhow::Error::from),
395 None => Err(anyhow::anyhow!(
396 "coordinator port is not set for proc {}",
397 self.proc_id()
398 )),
399 };
400 if let Err(err) = result {
401 tracing::error!(
402 "proc {}: could not propagate supervision event: {:?}: crashing",
403 self.proc_id(),
404 err
405 );
406
407 std::process::exit(1);
408 }
409 }
410
411 pub fn local() -> Self {
414 let proc_id = ProcId::Ranked(id!(local), NEXT_LOCAL_RANK.fetch_add(1, Ordering::Relaxed));
417 Proc::new(proc_id, BoxedMailboxSender::new(PanickingMailboxSender))
419 }
420
421 pub fn proc_id(&self) -> &ProcId {
423 &self.state().proc_id
424 }
425
426 pub fn forwarder(&self) -> &BoxedMailboxSender {
429 &self.inner.forwarder
430 }
431
432 fn state(&self) -> &ProcState {
434 self.inner.as_ref()
435 }
436
437 pub fn clock(&self) -> &ClockKind {
439 &self.state().clock
440 }
441
442 pub fn ledger_snapshot(&self) -> ActorLedgerSnapshot {
444 self.state().ledger.snapshot()
445 }
446
447 pub fn attach(&self, name: &str) -> Result<Mailbox, anyhow::Error> {
449 let actor_id: ActorId = self.allocate_root_id(name)?;
450 Ok(self.bind_mailbox(actor_id))
451 }
452
453 pub fn attach_child(&self, parent_id: &ActorId) -> Result<Mailbox, anyhow::Error> {
455 let actor_id: ActorId = self.allocate_child_id(parent_id)?;
456 Ok(self.bind_mailbox(actor_id))
457 }
458
459 fn bind_mailbox(&self, actor_id: ActorId) -> Mailbox {
461 let mbox = Mailbox::new(actor_id, BoxedMailboxSender::new(self.downgrade()));
462
463 self.state().proc_muxer.bind_mailbox(mbox.clone());
466 mbox
467 }
468
469 pub fn attach_actor<R, M>(
472 &self,
473 name: &str,
474 ) -> Result<(Instance<()>, ActorRef<R>, PortReceiver<M>), anyhow::Error>
475 where
476 M: RemoteMessage,
477 R: Referable + RemoteHandles<M>,
478 {
479 let (instance, _handle) = self.instance(name)?;
480 let (handle, rx) = instance.open_port::<M>();
481 handle.bind_to(M::port());
482 let actor_ref = ActorRef::attest(instance.self_id().clone());
483 Ok((instance, actor_ref, rx))
484 }
485
486 #[hyperactor::observe_result("Proc")]
489 pub async fn spawn<A: Actor>(
490 &self,
491 name: &str,
492 params: A::Params,
493 ) -> Result<ActorHandle<A>, anyhow::Error> {
494 let actor_id = self.allocate_root_id(name)?;
495 let _ = tracing::debug_span!(
496 "spawn_actor",
497 actor_name = name,
498 actor_type = std::any::type_name::<A>(),
499 actor_id = actor_id.to_string(),
500 );
501 let (instance, mut actor_loop_receivers, work_rx) =
502 Instance::new(self.clone(), actor_id.clone(), false, None);
503 let actor = A::new(params).await?;
504 self.state()
508 .ledger
509 .insert(actor_id.clone(), instance.cell.downgrade())?;
510
511 instance
512 .start(actor, actor_loop_receivers.take().unwrap(), work_rx)
513 .await
514 }
515
516 pub fn instance(&self, name: &str) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
522 let actor_id = self.allocate_root_id(name)?;
523 let _ = tracing::debug_span!(
524 "actor_instance",
525 actor_name = name,
526 actor_type = std::any::type_name::<()>(),
527 actor_id = actor_id.to_string(),
528 );
529
530 let (instance, _, _) = Instance::new(self.clone(), actor_id.clone(), true, None);
531 let handle = ActorHandle::new(instance.cell.clone(), instance.ports.clone());
532
533 instance.change_status(ActorStatus::Client);
534
535 Ok((instance, handle))
536 }
537
538 fn child_instance(
540 &self,
541 parent: InstanceCell,
542 ) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
543 let actor_id = self.allocate_child_id(parent.actor_id())?;
544 let _ = tracing::debug_span!(
545 "child_actor_instance",
546 parent_actor_id = %parent.actor_id(),
547 actor_type = std::any::type_name::<()>(),
548 actor_id = %actor_id,
549 );
550
551 let (instance, _, _) = Instance::new(self.clone(), actor_id, false, Some(parent));
552 let handle = ActorHandle::new(instance.cell.clone(), instance.ports.clone());
553 instance.change_status(ActorStatus::Client);
554 Ok((instance, handle))
555 }
556
557 async fn spawn_child<A: Actor>(
563 &self,
564 parent: InstanceCell,
565 params: A::Params,
566 ) -> Result<ActorHandle<A>, anyhow::Error> {
567 let actor_id = self.allocate_child_id(parent.actor_id())?;
568 let (instance, mut actor_loop_receivers, work_rx) =
569 Instance::new(self.clone(), actor_id, false, Some(parent.clone()));
570 let actor = A::new(params).await?;
571 instance
572 .start(actor, actor_loop_receivers.take().unwrap(), work_rx)
573 .await
574 }
575
576 pub fn abort_root_actor(
580 &self,
581 root: &ActorId,
582 this_handle: Option<&JoinHandle<()>>,
583 ) -> Option<impl Future<Output = ActorId>> {
584 self.state()
585 .ledger
586 .roots
587 .get(root)
588 .into_iter()
589 .flat_map(|e| e.upgrade())
590 .map(|cell| {
591 let r1 = root.clone();
592 let r2 = root.clone();
593 let skip_abort = this_handle.is_some_and(|this_h| {
595 cell.inner
596 .actor_task_handle
597 .get()
598 .is_some_and(|other_h| std::ptr::eq(this_h, other_h))
599 });
600 async move {
606 tokio::task::spawn_blocking(move || {
607 if !skip_abort {
608 let h = cell.inner.actor_task_handle.wait();
609 tracing::debug!("{}: aborting {:?}", r1, h);
610 h.abort();
611 }
612 })
613 .await
614 .unwrap();
615 r2
616 }
617 })
618 .next()
619 }
620
621 pub fn stop_actor(&self, actor_id: &ActorId) -> Option<watch::Receiver<ActorStatus>> {
624 if let Some(entry) = self.state().ledger.roots.get(actor_id) {
625 match entry.value().upgrade() {
626 None => None, Some(cell) => {
628 tracing::info!("sending stop signal to {}", cell.actor_id());
629 if let Err(err) = cell.signal(Signal::DrainAndStop) {
630 tracing::error!(
631 "{}: failed to send stop signal to pid {}: {:?}",
632 self.proc_id(),
633 cell.pid(),
634 err
635 );
636 None
637 } else {
638 Some(cell.status().clone())
639 }
640 }
641 }
642 } else {
643 tracing::error!("no actor {} found in {} roots", actor_id, self.proc_id());
644 None
645 }
646 }
647
648 #[hyperactor::instrument]
656 pub async fn destroy_and_wait<A: Actor>(
657 &mut self,
658 timeout: Duration,
659 cx: Option<&Context<'_, A>>,
660 ) -> Result<(Vec<ActorId>, Vec<ActorId>), anyhow::Error> {
661 tracing::debug!("{}: proc stopping", self.proc_id());
662
663 let (this_handle, this_actor_id) = cx.map_or((None, None), |cx| {
664 (
665 Some(cx.actor_task_handle().expect("cannot call destroy_and_wait from inside an actor unless actor has finished starting")),
666 Some(cx.self_id())
667 )
668 });
669
670 let mut statuses = HashMap::new();
671 for actor_id in self
672 .state()
673 .ledger
674 .roots
675 .iter()
676 .map(|entry| entry.key().clone())
677 .collect::<Vec<_>>()
678 {
679 if let Some(status) = self.stop_actor(&actor_id) {
680 statuses.insert(actor_id, status);
681 }
682 }
683 tracing::debug!("{}: proc stopped", self.proc_id());
684
685 let waits: Vec<_> = statuses
686 .iter_mut()
687 .filter(|(actor_id, _)| Some(*actor_id) != this_actor_id)
688 .map(|(actor_id, root)| {
689 let actor_id = actor_id.clone();
690 async move {
691 RealClock
692 .timeout(
693 timeout,
694 root.wait_for(|state: &ActorStatus| {
695 matches!(*state, ActorStatus::Stopped)
696 }),
697 )
698 .await
699 .ok()
700 .map(|_| actor_id)
701 }
702 })
703 .collect();
704
705 let results = futures::future::join_all(waits).await;
706 let stopped_actors: Vec<_> = results
707 .iter()
708 .filter_map(|actor_id| actor_id.as_ref())
709 .cloned()
710 .collect();
711 let aborted_actors: Vec<_> = statuses
712 .iter()
713 .filter(|(actor_id, _)| !stopped_actors.contains(actor_id))
714 .map(|(actor_id, _)| {
715 let f = self.abort_root_actor(actor_id, this_handle);
716 async move {
717 let _ = if let Some(f) = f { Some(f.await) } else { None };
718 actor_id.clone()
725 }
726 })
727 .collect();
728 let aborted_actors = futures::future::join_all(aborted_actors).await;
729
730 if let Some(this_handle) = this_handle
731 && let Some(this_actor_id) = this_actor_id
732 {
733 tracing::debug!("{}: aborting (delayed) {:?}", this_actor_id, this_handle);
734 this_handle.abort()
735 };
736
737 tracing::info!(
738 "destroy_and_wait: {} actors stopped, {} actors aborted",
739 stopped_actors.len(),
740 aborted_actors.len()
741 );
742 Ok((stopped_actors, aborted_actors))
743 }
744
745 pub fn resolve_actor_ref<R: Actor + Referable>(
754 &self,
755 actor_ref: &ActorRef<R>,
756 ) -> Option<ActorHandle<R>> {
757 self.inner
758 .instances
759 .get(actor_ref.actor_id())?
760 .upgrade()?
761 .downcast_handle()
762 }
763
764 #[hyperactor::instrument(fields(actor_name=name))]
766 fn allocate_root_id(&self, name: &str) -> Result<ActorId, anyhow::Error> {
767 let name = name.to_string();
768 match self.state().roots.entry(name.to_string()) {
769 Entry::Vacant(entry) => {
770 entry.insert(AtomicUsize::new(1));
771 }
772 Entry::Occupied(_) => {
773 anyhow::bail!("an actor with name '{}' has already been spawned", name)
774 }
775 }
776 Ok(ActorId(self.state().proc_id.clone(), name.to_string(), 0))
777 }
778
779 #[hyperactor::instrument(fields(actor_name=parent_id.name()))]
781 pub(crate) fn allocate_child_id(&self, parent_id: &ActorId) -> Result<ActorId, anyhow::Error> {
782 assert_eq!(*parent_id.proc_id(), self.state().proc_id);
783 let pid = match self.state().roots.get(parent_id.name()) {
784 None => anyhow::bail!(
785 "no actor named {} in proc {}",
786 parent_id.name(),
787 self.state().proc_id
788 ),
789 Some(next_pid) => next_pid.fetch_add(1, Ordering::Relaxed),
790 };
791 Ok(parent_id.child_id(pid))
792 }
793
794 fn downgrade(&self) -> WeakProc {
795 WeakProc::new(self)
796 }
797}
798
799#[async_trait]
800impl MailboxSender for Proc {
801 fn post_unchecked(
802 &self,
803 envelope: MessageEnvelope,
804 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
805 ) {
806 if envelope.dest().actor_id().proc_id() == &self.state().proc_id {
807 self.state().proc_muxer.post(envelope, return_handle)
808 } else {
809 self.state().forwarder.post(envelope, return_handle)
810 }
811 }
812}
813
814#[derive(Debug)]
815struct WeakProc(Weak<ProcState>);
816
817impl WeakProc {
818 fn new(proc: &Proc) -> Self {
819 Self(Arc::downgrade(&proc.inner))
820 }
821
822 fn upgrade(&self) -> Option<Proc> {
823 self.0.upgrade().map(|inner| Proc { inner })
824 }
825}
826
827#[async_trait]
828impl MailboxSender for WeakProc {
829 fn post_unchecked(
830 &self,
831 envelope: MessageEnvelope,
832 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
833 ) {
834 match self.upgrade() {
835 Some(proc) => proc.post(envelope, return_handle),
836 None => envelope.undeliverable(
837 DeliveryError::BrokenLink("fail to upgrade WeakProc".to_string()),
838 return_handle,
839 ),
840 }
841 }
842}
843
844struct WorkCell<A: Actor + Send>(
847 Box<
848 dyn for<'a> FnOnce(
849 &'a mut A,
850 &'a mut Instance<A>,
851 )
852 -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
853 + Send
854 + Sync,
855 >,
856);
857
858impl<A: Actor + Send> WorkCell<A> {
859 fn new(
861 f: impl for<'a> FnOnce(
862 &'a mut A,
863 &'a mut Instance<A>,
864 )
865 -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
866 + Send
867 + Sync
868 + 'static,
869 ) -> Self {
870 Self(Box::new(f))
871 }
872
873 fn handle<'a>(
875 self,
876 actor: &'a mut A,
877 instance: &'a mut Instance<A>,
878 ) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'a>> {
879 (self.0)(actor, instance)
880 }
881}
882
883pub struct Context<'a, A: Actor> {
885 instance: &'a Instance<A>,
886 headers: Attrs,
887}
888
889impl<'a, A: Actor> Context<'a, A> {
890 pub fn new(instance: &'a Instance<A>, headers: Attrs) -> Self {
892 Self { instance, headers }
893 }
894
895 pub fn headers(&self) -> &Attrs {
897 &self.headers
898 }
899}
900
901impl<A: Actor> Deref for Context<'_, A> {
902 type Target = Instance<A>;
903
904 fn deref(&self) -> &Self::Target {
905 self.instance
906 }
907}
908
909pub struct Instance<A: Actor> {
913 proc: Proc,
915
916 cell: InstanceCell,
918
919 mailbox: Mailbox,
921
922 ports: Arc<Ports<A>>,
923
924 status_tx: watch::Sender<ActorStatus>,
926
927 id: Uuid,
929
930 sequencer: Sequencer,
932}
933
934impl<A: Actor> Instance<A> {
935 fn new(
937 proc: Proc,
938 actor_id: ActorId,
939 detached: bool,
940 parent: Option<InstanceCell>,
941 ) -> (
942 Self,
943 Option<(PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>)>,
944 mpsc::UnboundedReceiver<WorkCell<A>>,
945 ) {
946 let mailbox = Mailbox::new(actor_id.clone(), BoxedMailboxSender::new(proc.downgrade()));
948 let (work_tx, work_rx) = ordered_channel(
949 actor_id.to_string(),
950 config::global::get(config::ENABLE_CLIENT_SEQ_ASSIGNMENT),
951 );
952 let ports: Arc<Ports<A>> = Arc::new(Ports::new(mailbox.clone(), work_tx));
953 proc.state().proc_muxer.bind_mailbox(mailbox.clone());
954 let (status_tx, status_rx) = watch::channel(ActorStatus::Created);
955
956 let actor_type = match TypeInfo::of::<A>() {
957 Some(info) => ActorType::Named(info),
958 None => ActorType::Anonymous(std::any::type_name::<A>()),
959 };
960 let actor_loop_ports = if detached {
961 None
962 } else {
963 let (signal_port, signal_receiver) = ports.open_message_port().unwrap();
964 let (supervision_port, supervision_receiver) = mailbox.open_port();
965 Some((
966 (signal_port, supervision_port),
967 (signal_receiver, supervision_receiver),
968 ))
969 };
970
971 let (actor_loop, actor_loop_receivers) = actor_loop_ports.unzip();
972
973 let cell = InstanceCell::new(
974 actor_id,
975 actor_type,
976 proc.clone(),
977 actor_loop,
978 status_rx,
979 parent,
980 ports.clone(),
981 );
982 let instance_id = Uuid::now_v7();
983 (
984 Self {
985 proc,
986 cell,
987 mailbox,
988 ports,
989 status_tx,
990 sequencer: Sequencer::new(instance_id),
991 id: instance_id,
992 },
993 actor_loop_receivers,
994 work_rx,
995 )
996 }
997
998 fn change_status(&self, new: ActorStatus) {
1001 self.status_tx.send_replace(new.clone());
1002 }
1003
1004 pub fn self_id(&self) -> &ActorId {
1006 self.mailbox.actor_id()
1007 }
1008
1009 #[allow(clippy::result_large_err)] pub fn stop(&self) -> Result<(), ActorError> {
1012 tracing::info!("Instance::stop called, {}", self.cell.actor_id());
1013 self.cell.signal(Signal::DrainAndStop)
1014 }
1015
1016 pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1021 self.mailbox.open_port()
1022 }
1023
1024 pub fn open_once_port<M: Message>(&self) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1028 self.mailbox.open_once_port()
1029 }
1030
1031 pub fn post(&self, port_id: PortId, headers: Attrs, message: Serialized) {
1033 <Self as context::MailboxExt>::post(self, port_id, headers, message)
1034 }
1035
1036 #[allow(clippy::result_large_err)] pub fn self_message_with_delay<M>(&self, message: M, delay: Duration) -> Result<(), ActorError>
1039 where
1040 M: Message,
1041 A: Handler<M>,
1042 {
1043 let port = self.port();
1044 let self_id = self.self_id().clone();
1045 let clock = self.proc.state().clock.clone();
1046 tokio::spawn(async move {
1047 clock.non_advancing_sleep(delay).await;
1048 if let Err(e) = port.send(message) {
1049 tracing::info!("{}: error sending delayed message: {}", self_id, e);
1052 }
1053 });
1054 Ok(())
1055 }
1056
1057 #[hyperactor::instrument(fields(actor_id=self.cell.actor_id().clone().to_string(), actor_name=self.cell.actor_id().name()))]
1060 async fn start(
1061 self,
1062 actor: A,
1063 actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1064 work_rx: mpsc::UnboundedReceiver<WorkCell<A>>,
1065 ) -> Result<ActorHandle<A>, anyhow::Error> {
1066 let instance_cell = self.cell.clone();
1067 let actor_id = self.cell.actor_id().clone();
1068 let actor_handle = ActorHandle::new(self.cell.clone(), self.ports.clone());
1069 let actor_task_handle = A::spawn_server_task(panic_handler::with_backtrace_tracking(
1070 self.serve(actor, actor_loop_receivers, work_rx),
1071 ));
1072 tracing::debug!("{}: spawned with {:?}", actor_id, actor_task_handle);
1073 instance_cell
1074 .inner
1075 .actor_task_handle
1076 .set(actor_task_handle)
1077 .unwrap_or_else(|_| panic!("{}: task handle store failed", actor_id));
1078
1079 Ok(actor_handle)
1080 }
1081
1082 async fn serve(
1083 mut self,
1084 mut actor: A,
1085 actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1086 mut work_rx: mpsc::UnboundedReceiver<WorkCell<A>>,
1087 ) {
1088 let result = self
1094 .run_actor_tree(&mut actor, actor_loop_receivers, &mut work_rx)
1095 .await;
1096
1097 let (actor_status, event) = match result {
1098 Ok(_) => (ActorStatus::Stopped, None),
1099 Err(ActorError {
1100 kind: ActorErrorKind::UnhandledSupervisionEvent(event),
1101 ..
1102 }) => (event.actor_status.clone(), Some(event)),
1103 Err(err) => (
1104 ActorStatus::Failed(err.to_string()),
1105 Some(ActorSupervisionEvent::new(
1106 self.cell.actor_id().clone(),
1107 ActorStatus::Failed(err.to_string()),
1108 None,
1109 None,
1110 )),
1111 ),
1112 };
1113
1114 if let Some(parent) = self.cell.maybe_unlink_parent() {
1115 if let Some(event) = event {
1116 parent.send_supervision_event_or_crash(event);
1118 }
1119 if let Err(err) = parent.signal(Signal::ChildStopped(self.cell.pid())) {
1122 tracing::error!(
1123 "{}: failed to send stop message to parent pid {}: {:?}",
1124 self.self_id(),
1125 parent.pid(),
1126 err
1127 );
1128 }
1129 } else {
1130 if let Some(event) = event {
1136 self.proc.handle_supervision_event(event);
1137 }
1138 }
1139 self.change_status(actor_status);
1140 }
1141
1142 async fn run_actor_tree(
1145 &mut self,
1146 actor: &mut A,
1147 mut actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1148 work_rx: &mut mpsc::UnboundedReceiver<WorkCell<A>>,
1149 ) -> Result<(), ActorError> {
1150 let result = match AssertUnwindSafe(self.run(actor, &mut actor_loop_receivers, work_rx))
1156 .catch_unwind()
1157 .await
1158 {
1159 Ok(result) => result,
1160 Err(err) => {
1161 let err_msg = err
1163 .downcast_ref::<&str>()
1164 .copied()
1165 .or_else(|| err.downcast_ref::<String>().map(|s| s.as_str()))
1166 .unwrap_or("panic cannot be downcasted");
1167
1168 let backtrace = panic_handler::take_panic_backtrace()
1169 .unwrap_or_else(|e| format!("Cannot take backtrace due to: {:?}", e));
1170 Err(ActorError::new(
1171 self.self_id().clone(),
1172 ActorErrorKind::Panic(anyhow::anyhow!("{}\n{}", err_msg, backtrace)),
1173 ))
1174 }
1175 };
1176
1177 if let Err(ref err) = result {
1178 tracing::error!("{}: actor failure: {}", self.self_id(), err);
1179 }
1180 self.change_status(ActorStatus::Stopping);
1181
1182 let mut to_unlink = Vec::new();
1185 for child in self.cell.child_iter() {
1186 if let Err(err) = child.value().signal(Signal::Stop) {
1187 tracing::error!(
1188 "{}: failed to send stop signal to child pid {}: {:?}",
1189 self.self_id(),
1190 child.key(),
1191 err
1192 );
1193 to_unlink.push(child.value().clone());
1194 }
1195 }
1196 for child in to_unlink {
1198 self.cell.unlink(&child);
1199 }
1200
1201 let (mut signal_receiver, _) = actor_loop_receivers;
1202 while self.cell.child_count() > 0 {
1203 if let Signal::ChildStopped(pid) = signal_receiver.recv().await? {
1204 assert!(self.cell.get_child(pid).is_none());
1205 }
1206 }
1207
1208 result
1209 }
1210
1211 #[tracing::instrument(level = "info", skip_all, fields(actor_id = %self.self_id()))]
1213 async fn run(
1214 &mut self,
1215 actor: &mut A,
1216 actor_loop_receivers: &mut (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1217 work_rx: &mut mpsc::UnboundedReceiver<WorkCell<A>>,
1218 ) -> Result<(), ActorError> {
1219 let (signal_receiver, supervision_event_receiver) = actor_loop_receivers;
1220
1221 self.change_status(ActorStatus::Initializing);
1222 actor
1223 .init(self)
1224 .await
1225 .map_err(|err| ActorError::new(self.self_id().clone(), ActorErrorKind::Init(err)))?;
1226 let need_drain;
1227 'messages: loop {
1228 self.change_status(ActorStatus::Idle);
1229 let metric_pairs =
1230 hyperactor_telemetry::kv_pairs!("actor_id" => self.self_id().to_string());
1231 tokio::select! {
1232 work = work_rx.recv() => {
1233 ACTOR_MESSAGES_RECEIVED.add(1, metric_pairs);
1234 ACTOR_MESSAGE_QUEUE_SIZE.add(-1, metric_pairs);
1235 let _ = ACTOR_MESSAGE_HANDLER_DURATION.start(metric_pairs);
1236 let work = work.expect("inconsistent work queue state");
1237 if let Err(err) = work.handle(actor, self).await {
1238 for supervision_event in supervision_event_receiver.drain() {
1239 self.handle_supervision_event(actor, supervision_event).await?;
1240 }
1241 return Err(ActorError::new(self.self_id().clone(), ActorErrorKind::Processing(err)));
1242 }
1243 }
1244 signal = signal_receiver.recv() => {
1245 let signal = signal.map_err(ActorError::from);
1246 tracing::debug!("Received signal {signal:?}");
1247 match signal? {
1248 signal@(Signal::Stop | Signal::DrainAndStop) => {
1249 need_drain = matches!(signal, Signal::DrainAndStop);
1250 break 'messages;
1251 },
1252 Signal::ChildStopped(pid) => {
1253 assert!(self.cell.get_child(pid).is_none());
1254 },
1255 }
1256 }
1257 Ok(supervision_event) = supervision_event_receiver.recv() => {
1258 self.handle_supervision_event(actor, supervision_event).await?;
1259 }
1260 }
1261 self.cell
1262 .inner
1263 .num_processed_messages
1264 .fetch_add(1, Ordering::SeqCst);
1265 }
1266
1267 if need_drain {
1268 self.change_status(ActorStatus::Stopping);
1269 let mut n = 0;
1270 while let Ok(work) = work_rx.try_recv() {
1271 if let Err(err) = work.handle(actor, self).await {
1272 return Err(ActorError::new(
1273 self.self_id().clone(),
1274 ActorErrorKind::Processing(err),
1275 ));
1276 }
1277 n += 1;
1278 }
1279 tracing::debug!("drained {} messages", n);
1280 }
1281 tracing::debug!("exited actor loop: {}", self.self_id());
1282 self.change_status(ActorStatus::Stopped);
1283 Ok(())
1284 }
1285
1286 async fn handle_supervision_event(
1287 &self,
1288 actor: &mut A,
1289 supervision_event: ActorSupervisionEvent,
1290 ) -> Result<(), ActorError> {
1291 match actor
1293 .handle_supervision_event(self, &supervision_event)
1294 .await
1295 {
1296 Ok(true) => {
1297 Ok(())
1299 }
1300 Ok(false) => {
1301 let supervision_event = ActorSupervisionEvent::new(
1303 self.self_id().clone(),
1304 ActorStatus::Failed("did not handle supervision event".to_string()),
1305 None,
1306 Some(Box::new(supervision_event)),
1307 );
1308 Err(supervision_event.into())
1309 }
1310 Err(err) => {
1311 let supervision_event = ActorSupervisionEvent::new(
1314 self.self_id().clone(),
1315 ActorStatus::Failed(format!("failed to handle supervision event: {}", err)),
1316 None,
1317 Some(Box::new(supervision_event)),
1318 );
1319 Err(supervision_event.into())
1320 }
1321 }
1322 }
1323
1324 async unsafe fn handle_message<M: Message>(
1325 &mut self,
1326 actor: &mut A,
1327 type_info: Option<&'static TypeInfo>,
1328 headers: Attrs,
1329 message: M,
1330 ) -> Result<(), anyhow::Error>
1331 where
1332 A: Handler<M>,
1333 {
1334 let handler = type_info.map(|info| {
1335 (
1336 info.typename().to_string(),
1337 unsafe {
1339 info.arm_unchecked(&message as *const M as *const ())
1340 .map(str::to_string)
1341 },
1342 )
1343 });
1344
1345 self.change_status(ActorStatus::Processing(
1346 self.clock().system_time_now(),
1347 handler,
1348 ));
1349 crate::mailbox::headers::log_message_latency_if_sampling(
1350 &headers,
1351 self.self_id().to_string(),
1352 );
1353 let span = tracing::debug_span!(
1354 "actor_status",
1355 actor_id = self.self_id().to_string(),
1356 actor_name = self.self_id().name(),
1357 name = self.cell.status().borrow().to_string(),
1358 );
1359
1360 let context = Context::new(self, headers);
1361 actor.handle(&context, message).instrument(span).await
1365 }
1366
1367 pub(crate) async fn spawn<C: Actor>(
1369 &self,
1370 params: C::Params,
1371 ) -> anyhow::Result<ActorHandle<C>> {
1372 self.proc.spawn_child(self.cell.clone(), params).await
1373 }
1374
1375 pub fn child(&self) -> anyhow::Result<(Instance<()>, ActorHandle<()>)> {
1377 self.proc.child_instance(self.cell.clone())
1378 }
1379
1380 pub fn port<M: Message>(&self) -> PortHandle<M>
1383 where
1384 A: Handler<M>,
1385 {
1386 self.ports.get()
1387 }
1388
1389 pub fn handle(&self) -> ActorHandle<A> {
1391 ActorHandle::new(self.cell.clone(), Arc::clone(&self.ports))
1392 }
1393
1394 pub fn bind<R: Binds<A>>(&self) -> ActorRef<R> {
1396 self.cell.bind(self.ports.as_ref())
1397 }
1398
1399 #[doc(hidden)]
1401 pub fn mailbox_for_py(&self) -> &Mailbox {
1402 &self.mailbox
1403 }
1404
1405 pub fn clock(&self) -> &(impl Clock + use<A>) {
1407 &self.proc.state().clock
1408 }
1409
1410 pub fn proc(&self) -> &Proc {
1412 &self.proc
1413 }
1414
1415 #[doc(hidden)]
1419 pub fn clone_for_py(&self) -> Self {
1420 Self {
1421 proc: self.proc.clone(),
1422 cell: self.cell.clone(),
1423 mailbox: self.mailbox.clone(),
1424 ports: self.ports.clone(),
1425 status_tx: self.status_tx.clone(),
1426 sequencer: self.sequencer.clone(),
1427 id: self.id,
1428 }
1429 }
1430
1431 fn actor_task_handle(&self) -> Option<&JoinHandle<()>> {
1433 self.cell.inner.actor_task_handle.get()
1434 }
1435
1436 pub fn sequencer(&self) -> &Sequencer {
1438 &self.sequencer
1439 }
1440
1441 pub fn instance_id(&self) -> Uuid {
1443 self.id
1444 }
1445}
1446
1447impl<A: Actor> Drop for Instance<A> {
1448 fn drop(&mut self) {
1449 self.status_tx.send_if_modified(|status| {
1450 if status.is_terminal() {
1451 false
1452 } else {
1453 *status = ActorStatus::Stopped;
1454 true
1455 }
1456 });
1457 }
1458}
1459
1460impl<A: Actor> context::Mailbox for Instance<A> {
1461 fn mailbox(&self) -> &Mailbox {
1462 &self.mailbox
1463 }
1464}
1465
1466impl<A: Actor> context::Mailbox for Context<'_, A> {
1467 fn mailbox(&self) -> &Mailbox {
1468 &self.mailbox
1469 }
1470}
1471
1472impl<A: Actor> context::Mailbox for &Instance<A> {
1473 fn mailbox(&self) -> &Mailbox {
1474 &self.mailbox
1475 }
1476}
1477
1478impl<A: Actor> context::Mailbox for &Context<'_, A> {
1479 fn mailbox(&self) -> &Mailbox {
1480 &self.mailbox
1481 }
1482}
1483
1484impl<A: Actor> context::Actor for Instance<A> {
1485 type A = A;
1486 fn instance(&self) -> &Instance<A> {
1487 self
1488 }
1489}
1490
1491impl<A: Actor> context::Actor for Context<'_, A> {
1492 type A = A;
1493 fn instance(&self) -> &Instance<A> {
1494 self
1495 }
1496}
1497
1498impl<A: Actor> context::Actor for &Instance<A> {
1499 type A = A;
1500 fn instance(&self) -> &Instance<A> {
1501 self
1502 }
1503}
1504
1505impl<A: Actor> context::Actor for &Context<'_, A> {
1506 type A = A;
1507 fn instance(&self) -> &Instance<A> {
1508 self
1509 }
1510}
1511
1512#[derive(Debug)]
1513enum ActorType {
1514 Named(&'static TypeInfo),
1515 Anonymous(&'static str),
1516}
1517
1518impl ActorType {
1519 fn type_name(&self) -> &'static str {
1521 match self {
1522 Self::Named(info) => info.typename(),
1523 Self::Anonymous(name) => name,
1524 }
1525 }
1526}
1527
1528#[derive(Clone, Debug)]
1534pub struct InstanceCell {
1535 inner: Arc<InstanceState>,
1536}
1537
1538#[derive(Debug)]
1539struct InstanceState {
1540 actor_id: ActorId,
1542
1543 actor_type: ActorType,
1545
1546 proc: Proc,
1548
1549 actor_loop: Option<(PortHandle<Signal>, PortHandle<ActorSupervisionEvent>)>,
1551
1552 status: watch::Receiver<ActorStatus>,
1554
1555 parent: WeakInstanceCell,
1557
1558 children: DashMap<Index, InstanceCell>,
1560
1561 actor_task_handle: OnceLock<JoinHandle<()>>,
1563
1564 exported_named_ports: DashMap<u64, &'static str>,
1566
1567 num_processed_messages: AtomicU64,
1569
1570 recording: Recording,
1573
1574 ports: Arc<dyn Any + Send + Sync>,
1577}
1578
1579impl InstanceState {
1580 fn maybe_unlink_parent(&self) -> Option<InstanceCell> {
1583 self.parent
1584 .upgrade()
1585 .filter(|parent| parent.inner.unlink(self))
1586 }
1587
1588 fn unlink(&self, child: &InstanceState) -> bool {
1590 assert_eq!(self.actor_id.proc_id(), child.actor_id.proc_id());
1591 self.children.remove(&child.actor_id.pid()).is_some()
1592 }
1593}
1594
1595impl InstanceCell {
1596 fn new(
1599 actor_id: ActorId,
1600 actor_type: ActorType,
1601 proc: Proc,
1602 actor_loop: Option<(PortHandle<Signal>, PortHandle<ActorSupervisionEvent>)>,
1603 status: watch::Receiver<ActorStatus>,
1604 parent: Option<InstanceCell>,
1605 ports: Arc<dyn Any + Send + Sync>,
1606 ) -> Self {
1607 let _ais = actor_id.to_string();
1608 let cell = Self {
1609 inner: Arc::new(InstanceState {
1610 actor_id: actor_id.clone(),
1611 actor_type,
1612 proc: proc.clone(),
1613 actor_loop,
1614 status,
1615 parent: parent.map_or_else(WeakInstanceCell::new, |cell| cell.downgrade()),
1616 children: DashMap::new(),
1617 actor_task_handle: OnceLock::new(),
1618 exported_named_ports: DashMap::new(),
1619 num_processed_messages: AtomicU64::new(0),
1620 recording: hyperactor_telemetry::recorder().record(64),
1621 ports,
1622 }),
1623 };
1624 cell.maybe_link_parent();
1625 proc.inner
1626 .instances
1627 .insert(actor_id.clone(), cell.downgrade());
1628 cell
1629 }
1630
1631 fn wrap(inner: Arc<InstanceState>) -> Self {
1632 Self { inner }
1633 }
1634
1635 pub(crate) fn actor_id(&self) -> &ActorId {
1637 &self.inner.actor_id
1638 }
1639
1640 pub(crate) fn pid(&self) -> Index {
1642 self.inner.actor_id.pid()
1643 }
1644
1645 #[allow(dead_code)]
1647 pub(crate) fn actor_task_handle(&self) -> Option<&JoinHandle<()>> {
1648 self.inner.actor_task_handle.get()
1649 }
1650
1651 pub(crate) fn status(&self) -> &watch::Receiver<ActorStatus> {
1653 &self.inner.status
1654 }
1655
1656 #[allow(clippy::result_large_err)] pub fn signal(&self, signal: Signal) -> Result<(), ActorError> {
1659 if let Some((signal_port, _)) = &self.inner.actor_loop {
1660 signal_port.send(signal).map_err(ActorError::from)
1661 } else {
1662 tracing::warn!(
1663 "{}: attempted to send signal {} to detached actor",
1664 self.inner.actor_id,
1665 signal
1666 );
1667 Ok(())
1668 }
1669 }
1670
1671 pub fn send_supervision_event_or_crash(&self, event: ActorSupervisionEvent) {
1680 match &self.inner.actor_loop {
1681 Some((_, supervision_port)) => {
1682 if let Err(err) = supervision_port.send(event) {
1683 tracing::error!(
1684 "{}: failed to send supervision event to actor: {:?}. Crash the process.",
1685 self.actor_id(),
1686 err
1687 );
1688 std::process::exit(1);
1689 }
1690 }
1691 None => {
1692 tracing::error!(
1693 "{}: failed: {}: cannot send supervision event to detached actor: crashing",
1694 self.actor_id(),
1695 event,
1696 );
1697 std::process::exit(1);
1698 }
1699 }
1700 }
1701
1702 pub fn downgrade(&self) -> WeakInstanceCell {
1704 WeakInstanceCell {
1705 inner: Arc::downgrade(&self.inner),
1706 }
1707 }
1708
1709 fn link(&self, child: InstanceCell) {
1711 assert_eq!(self.actor_id().proc_id(), child.actor_id().proc_id());
1712 self.inner.children.insert(child.pid(), child);
1713 }
1714
1715 fn unlink(&self, child: &InstanceCell) {
1717 assert_eq!(self.actor_id().proc_id(), child.actor_id().proc_id());
1718 self.inner.children.remove(&child.pid());
1719 }
1720
1721 fn maybe_link_parent(&self) {
1723 if let Some(parent) = self.inner.parent.upgrade() {
1724 parent.link(self.clone());
1725 }
1726 }
1727
1728 fn maybe_unlink_parent(&self) -> Option<InstanceCell> {
1731 self.inner.maybe_unlink_parent()
1732 }
1733
1734 #[allow(dead_code)]
1736 fn get_parent_cell(&self) -> Option<InstanceCell> {
1737 self.inner.parent.upgrade()
1738 }
1739
1740 fn child_iter(&self) -> impl Iterator<Item = RefMulti<'_, Index, InstanceCell>> {
1743 self.inner.children.iter()
1744 }
1745
1746 fn child_count(&self) -> usize {
1748 self.inner.children.len()
1749 }
1750
1751 fn get_child(&self, pid: Index) -> Option<InstanceCell> {
1753 self.inner.children.get(&pid).map(|child| child.clone())
1754 }
1755
1756 pub(crate) fn bind<A: Actor, R: Binds<A>>(&self, ports: &Ports<A>) -> ActorRef<R> {
1759 <R as Binds<A>>::bind(ports);
1760 ports.bind::<Signal>();
1762 ports.bind::<Undeliverable<MessageEnvelope>>();
1763 for entry in ports.bound.iter() {
1765 self.inner
1766 .exported_named_ports
1767 .insert(*entry.key(), entry.value());
1768 }
1769 ActorRef::attest(self.actor_id().clone())
1770 }
1771
1772 pub(crate) fn downcast_handle<A: Actor>(&self) -> Option<ActorHandle<A>> {
1774 let ports = Arc::clone(&self.inner.ports).downcast::<Ports<A>>().ok()?;
1775 Some(ActorHandle::new(self.clone(), ports))
1776 }
1777}
1778
1779impl Drop for InstanceState {
1780 fn drop(&mut self) {
1781 if let Some(parent) = self.maybe_unlink_parent() {
1782 tracing::debug!(
1783 "instance {} was dropped with parent {} still linked",
1784 self.actor_id,
1785 parent.actor_id()
1786 );
1787 }
1788 if self.proc.inner.instances.remove(&self.actor_id).is_none() {
1789 tracing::error!("instance {} was dropped but not in proc", self.actor_id);
1790 }
1791 }
1792}
1793
1794#[derive(Debug, Clone)]
1797pub struct WeakInstanceCell {
1798 inner: Weak<InstanceState>,
1799}
1800
1801impl Default for WeakInstanceCell {
1802 fn default() -> Self {
1803 Self::new()
1804 }
1805}
1806
1807impl WeakInstanceCell {
1808 pub fn new() -> Self {
1810 Self { inner: Weak::new() }
1811 }
1812
1813 pub fn upgrade(&self) -> Option<InstanceCell> {
1815 self.inner.upgrade().map(InstanceCell::wrap)
1816 }
1817}
1818
1819pub struct Ports<A: Actor> {
1824 ports: DashMap<TypeId, Box<dyn Any + Send + Sync + 'static>>,
1825 bound: DashMap<u64, &'static str>,
1826 mailbox: Mailbox,
1827 workq: OrderedSender<WorkCell<A>>,
1828}
1829
1830#[derive(Serialize, Deserialize, Clone, Named, AttrValue)]
1832pub struct SeqInfo {
1833 pub session_id: Uuid,
1835 pub seq: u64,
1837}
1838
1839impl fmt::Display for SeqInfo {
1840 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1841 write!(f, "{}:{}", self.session_id, self.seq)
1842 }
1843}
1844
1845impl std::str::FromStr for SeqInfo {
1846 type Err = anyhow::Error;
1847
1848 fn from_str(s: &str) -> Result<Self, Self::Err> {
1849 let parts: Vec<_> = s.split(':').collect();
1850 if parts.len() != 2 {
1851 return Err(anyhow::anyhow!("invalid SeqInfo: {}", s));
1852 }
1853 let session_id: Uuid = parts[0].parse()?;
1854 let seq: u64 = parts[1].parse()?;
1855 Ok(SeqInfo { session_id, seq })
1856 }
1857}
1858
1859declare_attrs! {
1860 pub attr SEQ_INFO: SeqInfo;
1863}
1864
1865impl<A: Actor> Ports<A> {
1866 fn new(mailbox: Mailbox, workq: OrderedSender<WorkCell<A>>) -> Self {
1867 Self {
1868 ports: DashMap::new(),
1869 bound: DashMap::new(),
1870 mailbox,
1871 workq,
1872 }
1873 }
1874
1875 pub(crate) fn get<M: Message>(&self) -> PortHandle<M>
1877 where
1878 A: Handler<M>,
1879 {
1880 let key = TypeId::of::<M>();
1881 match self.ports.entry(key) {
1882 Entry::Vacant(entry) => {
1883 assert_ne!(
1885 key,
1886 TypeId::of::<Signal>(),
1887 "cannot provision Signal port through `Ports::get`"
1888 );
1889
1890 let type_info = TypeInfo::get_by_typeid(key);
1891 let workq = self.workq.clone();
1892 let actor_id = self.mailbox.actor_id().to_string();
1893 let port = self.mailbox.open_enqueue_port(move |headers, msg: M| {
1894 let seq_info = headers.get(SEQ_INFO).cloned();
1895
1896 let work = WorkCell::new(move |actor: &mut A, instance: &mut Instance<A>| {
1897 Box::pin(async move {
1898 unsafe {
1900 instance
1901 .handle_message(actor, type_info, headers, msg)
1902 .await
1903 }
1904 })
1905 });
1906 ACTOR_MESSAGE_QUEUE_SIZE.add(
1907 1,
1908 hyperactor_telemetry::kv_pairs!("actor_id" => actor_id.clone()),
1909 );
1910 if workq.enable_buffering {
1911 let SeqInfo { session_id, seq } =
1912 seq_info.expect("SEQ_INFO must be set when buffering is enabled");
1913
1914 workq.send(session_id, seq, work).map_err(|e| match e {
1917 OrderedSenderError::InvalidZeroSeq(_) => {
1918 anyhow::anyhow!("seq must be greater than 0")
1919 }
1920 OrderedSenderError::SendError(e) => anyhow::Error::from(e),
1921 OrderedSenderError::FlushError(e) => e,
1922 })
1923 } else {
1924 workq.direct_send(work).map_err(anyhow::Error::from)
1925 }
1926 });
1927 entry.insert(Box::new(port.clone()));
1928 port
1929 }
1930 Entry::Occupied(entry) => {
1931 let port = entry.get();
1932 port.downcast_ref::<PortHandle<M>>().unwrap().clone()
1933 }
1934 }
1935 }
1936
1937 pub(crate) fn open_message_port<M: Message>(&self) -> Option<(PortHandle<M>, PortReceiver<M>)> {
1940 match self.ports.entry(TypeId::of::<M>()) {
1941 Entry::Vacant(entry) => {
1942 let (port, receiver) = self.mailbox.open_port();
1943 entry.insert(Box::new(port.clone()));
1944 Some((port, receiver))
1945 }
1946 Entry::Occupied(_) => None,
1947 }
1948 }
1949
1950 pub fn bind<M: RemoteMessage>(&self)
1952 where
1953 A: Handler<M>,
1954 {
1955 self.bind_to::<M>(M::port());
1956 }
1957
1958 pub fn bind_to<M: RemoteMessage>(&self, port_index: u64)
1962 where
1963 A: Handler<M>,
1964 {
1965 match self.bound.entry(port_index) {
1966 Entry::Vacant(entry) => {
1967 self.get::<M>().bind_to(port_index);
1968 entry.insert(M::typename());
1969 }
1970 Entry::Occupied(entry) => {
1971 assert_eq!(
1972 *entry.get(),
1973 M::typename(),
1974 "bind {}: port index {} already bound to type {}",
1975 M::typename(),
1976 port_index,
1977 entry.get(),
1978 );
1979 }
1980 }
1981 }
1982}
1983
1984#[cfg(test)]
1985mod tests {
1986 use std::assert_matches::assert_matches;
1987 use std::sync::atomic::AtomicBool;
1988
1989 use hyperactor_macros::export;
1990 use maplit::hashmap;
1991 use serde_json::json;
1992 use tokio::sync::Barrier;
1993 use tokio::sync::oneshot;
1994 use tracing::Level;
1995 use tracing_subscriber::layer::SubscriberExt;
1996 use tracing_test::internal::logs_with_scope_contain;
1997
1998 use super::*;
1999 use crate as hyperactor;
2001 use crate::HandleClient;
2002 use crate::Handler;
2003 use crate::OncePortRef;
2004 use crate::PortRef;
2005 use crate::clock::RealClock;
2006 use crate::test_utils::proc_supervison::ProcSupervisionCoordinator;
2007 use crate::test_utils::process_assertion::assert_termination;
2008
2009 impl ActorTreeSnapshot {
2010 #[allow(dead_code)]
2011 fn empty(pid: Index) -> Self {
2012 Self {
2013 pid,
2014 type_name: String::new(),
2015 status: ActorStatus::Idle,
2016 stats: ActorStats::default(),
2017 handlers: HashMap::new(),
2018 children: HashMap::new(),
2019 events: Vec::new(),
2020 spans: Vec::new(),
2021 }
2022 }
2023
2024 fn empty_typed(pid: Index, type_name: String) -> Self {
2025 Self {
2026 pid,
2027 type_name,
2028 status: ActorStatus::Idle,
2029 stats: ActorStats::default(),
2030 handlers: HashMap::new(),
2031 children: HashMap::new(),
2032 events: Vec::new(),
2033 spans: Vec::new(),
2034 }
2035 }
2036 }
2037
2038 #[derive(Debug, Default, Actor)]
2039 #[export]
2040 struct TestActor;
2041
2042 #[derive(Handler, HandleClient, Debug)]
2043 enum TestActorMessage {
2044 Reply(oneshot::Sender<()>),
2045 Wait(oneshot::Sender<()>, oneshot::Receiver<()>),
2046 Forward(ActorHandle<TestActor>, Box<TestActorMessage>),
2047 Noop(),
2048 Fail(anyhow::Error),
2049 Panic(String),
2050 Spawn(oneshot::Sender<ActorHandle<TestActor>>),
2051 }
2052
2053 impl TestActor {
2054 async fn spawn_child(parent: &ActorHandle<TestActor>) -> ActorHandle<TestActor> {
2055 let (tx, rx) = oneshot::channel();
2056 parent.send(TestActorMessage::Spawn(tx)).unwrap();
2057 rx.await.unwrap()
2058 }
2059 }
2060
2061 #[async_trait]
2062 #[crate::forward(TestActorMessage)]
2063 impl TestActorMessageHandler for TestActor {
2064 async fn reply(
2065 &mut self,
2066 _cx: &crate::Context<Self>,
2067 sender: oneshot::Sender<()>,
2068 ) -> Result<(), anyhow::Error> {
2069 sender.send(()).unwrap();
2070 Ok(())
2071 }
2072
2073 async fn wait(
2074 &mut self,
2075 _cx: &crate::Context<Self>,
2076 sender: oneshot::Sender<()>,
2077 receiver: oneshot::Receiver<()>,
2078 ) -> Result<(), anyhow::Error> {
2079 sender.send(()).unwrap();
2080 receiver.await.unwrap();
2081 Ok(())
2082 }
2083
2084 async fn forward(
2085 &mut self,
2086 _cx: &crate::Context<Self>,
2087 destination: ActorHandle<TestActor>,
2088 message: Box<TestActorMessage>,
2089 ) -> Result<(), anyhow::Error> {
2090 destination.send(*message)?;
2092 Ok(())
2093 }
2094
2095 async fn noop(&mut self, _cx: &crate::Context<Self>) -> Result<(), anyhow::Error> {
2096 Ok(())
2097 }
2098
2099 async fn fail(
2100 &mut self,
2101 _cx: &crate::Context<Self>,
2102 err: anyhow::Error,
2103 ) -> Result<(), anyhow::Error> {
2104 Err(err)
2105 }
2106
2107 async fn panic(
2108 &mut self,
2109 _cx: &crate::Context<Self>,
2110 err_msg: String,
2111 ) -> Result<(), anyhow::Error> {
2112 panic!("{}", err_msg);
2113 }
2114
2115 async fn spawn(
2116 &mut self,
2117 cx: &crate::Context<Self>,
2118 reply: oneshot::Sender<ActorHandle<TestActor>>,
2119 ) -> Result<(), anyhow::Error> {
2120 let handle = <Self as Actor>::spawn(cx, ()).await?;
2121 reply.send(handle).unwrap();
2122 Ok(())
2123 }
2124 }
2125
2126 #[tracing_test::traced_test]
2127 #[tokio::test]
2128 async fn test_spawn_actor() {
2129 let proc = Proc::local();
2130 let handle = proc.spawn::<TestActor>("test", ()).await.unwrap();
2131
2132 assert!(logs_contain(
2134 format!(
2135 "{}: spawned with {:?}",
2136 handle.actor_id(),
2137 handle.cell().actor_task_handle().unwrap(),
2138 )
2139 .as_str()
2140 ));
2141
2142 let mut state = handle.status().clone();
2143
2144 let (tx, rx) = oneshot::channel::<()>();
2147 handle.send(TestActorMessage::Reply(tx)).unwrap();
2148 rx.await.unwrap();
2149
2150 state
2151 .wait_for(|state: &ActorStatus| matches!(*state, ActorStatus::Idle))
2152 .await
2153 .unwrap();
2154
2155 let (enter_tx, enter_rx) = oneshot::channel::<()>();
2157 let (exit_tx, exit_rx) = oneshot::channel::<()>();
2158
2159 handle
2160 .send(TestActorMessage::Wait(enter_tx, exit_rx))
2161 .unwrap();
2162 enter_rx.await.unwrap();
2163 assert_matches!(*state.borrow(), ActorStatus::Processing(instant, _) if instant <= RealClock.system_time_now());
2164 exit_tx.send(()).unwrap();
2165
2166 state
2167 .wait_for(|state| matches!(*state, ActorStatus::Idle))
2168 .await
2169 .unwrap();
2170
2171 handle.drain_and_stop().unwrap();
2172 handle.await;
2173 assert_matches!(*state.borrow(), ActorStatus::Stopped);
2174 }
2175
2176 #[tokio::test]
2177 async fn test_proc_actors_messaging() {
2178 let proc = Proc::local();
2179 let first = proc.spawn::<TestActor>("first", ()).await.unwrap();
2180 let second = proc.spawn::<TestActor>("second", ()).await.unwrap();
2181 let (tx, rx) = oneshot::channel::<()>();
2182 let reply_message = TestActorMessage::Reply(tx);
2183 first
2184 .send(TestActorMessage::Forward(second, Box::new(reply_message)))
2185 .unwrap();
2186 rx.await.unwrap();
2187 }
2188
2189 #[derive(Debug, Default, Actor)]
2190 struct LookupTestActor;
2191
2192 #[derive(Handler, HandleClient, Debug)]
2193 enum LookupTestMessage {
2194 ActorExists(ActorRef<TestActor>, #[reply] OncePortRef<bool>),
2195 }
2196
2197 #[async_trait]
2198 #[crate::forward(LookupTestMessage)]
2199 impl LookupTestMessageHandler for LookupTestActor {
2200 async fn actor_exists(
2201 &mut self,
2202 cx: &crate::Context<Self>,
2203 actor_ref: ActorRef<TestActor>,
2204 ) -> Result<bool, anyhow::Error> {
2205 Ok(actor_ref.downcast_handle(cx).is_some())
2206 }
2207 }
2208
2209 #[tokio::test]
2210 async fn test_actor_lookup() {
2211 let proc = Proc::local();
2212 let (client, _handle) = proc.instance("client").unwrap();
2213
2214 let target_actor = proc.spawn::<TestActor>("target", ()).await.unwrap();
2215 let target_actor_ref = target_actor.bind();
2216 let lookup_actor = proc.spawn::<LookupTestActor>("lookup", ()).await.unwrap();
2217
2218 assert!(
2219 lookup_actor
2220 .actor_exists(&client, target_actor_ref.clone())
2221 .await
2222 .unwrap()
2223 );
2224
2225 assert!(
2227 !lookup_actor
2228 .actor_exists(
2229 &client,
2230 ActorRef::attest(target_actor.actor_id().child_id(123).clone())
2231 )
2232 .await
2233 .unwrap()
2234 );
2235 assert!(
2237 !lookup_actor
2238 .actor_exists(&client, ActorRef::attest(lookup_actor.actor_id().clone()))
2239 .await
2240 .unwrap()
2241 );
2242
2243 target_actor.drain_and_stop().unwrap();
2244 target_actor.await;
2245
2246 assert!(
2247 !lookup_actor
2248 .actor_exists(&client, target_actor_ref)
2249 .await
2250 .unwrap()
2251 );
2252
2253 lookup_actor.drain_and_stop().unwrap();
2254 lookup_actor.await;
2255 }
2256
2257 fn validate_link(child: &InstanceCell, parent: &InstanceCell) {
2258 assert_eq!(child.actor_id().proc_id(), parent.actor_id().proc_id());
2259 assert_eq!(
2260 child.inner.parent.upgrade().unwrap().actor_id(),
2261 parent.actor_id()
2262 );
2263 assert_matches!(
2264 parent.inner.children.get(&child.pid()),
2265 Some(node) if node.actor_id() == child.actor_id()
2266 );
2267 }
2268
2269 #[tracing_test::traced_test]
2270 #[tokio::test]
2271 async fn test_spawn_child() {
2272 let proc = Proc::local();
2273
2274 let first = proc.spawn::<TestActor>("first", ()).await.unwrap();
2275 let second = TestActor::spawn_child(&first).await;
2276 let third = TestActor::spawn_child(&second).await;
2277
2278 assert!(logs_with_scope_contain(
2280 "hyperactor::proc",
2281 format!(
2282 "{}: spawned with {:?}",
2283 first.actor_id(),
2284 first.cell().actor_task_handle().unwrap()
2285 )
2286 .as_str()
2287 ));
2288 assert!(logs_with_scope_contain(
2289 "hyperactor::proc",
2290 format!(
2291 "{}: spawned with {:?}",
2292 second.actor_id(),
2293 second.cell().actor_task_handle().unwrap()
2294 )
2295 .as_str()
2296 ));
2297 assert!(logs_with_scope_contain(
2298 "hyperactor::proc",
2299 format!(
2300 "{}: spawned with {:?}",
2301 third.actor_id(),
2302 third.cell().actor_task_handle().unwrap()
2303 )
2304 .as_str()
2305 ));
2306
2307 assert_eq!(first.actor_id().proc_id(), proc.proc_id());
2309 assert_eq!(second.actor_id(), &first.actor_id().child_id(1));
2310 assert_eq!(third.actor_id(), &first.actor_id().child_id(2));
2311
2312 validate_link(third.cell(), second.cell());
2314 validate_link(second.cell(), first.cell());
2315 assert!(first.cell().inner.parent.upgrade().is_none());
2316
2317 third.drain_and_stop().unwrap();
2319 third.await;
2320 assert!(second.cell().inner.children.is_empty());
2321 validate_link(second.cell(), first.cell());
2322
2323 second.drain_and_stop().unwrap();
2324 second.await;
2325 assert!(first.cell().inner.children.is_empty());
2326 }
2327
2328 #[tokio::test]
2329 async fn test_child_lifecycle() {
2330 let proc = Proc::local();
2331
2332 let root = proc.spawn::<TestActor>("root", ()).await.unwrap();
2333 let root_1 = TestActor::spawn_child(&root).await;
2334 let root_2 = TestActor::spawn_child(&root).await;
2335 let root_2_1 = TestActor::spawn_child(&root_2).await;
2336
2337 root.drain_and_stop().unwrap();
2338 root.await;
2339
2340 for actor in [root_1, root_2, root_2_1] {
2341 assert!(actor.send(TestActorMessage::Noop()).is_err());
2342 assert_matches!(actor.await, ActorStatus::Stopped);
2343 }
2344 }
2345
2346 #[tokio::test]
2347 async fn test_parent_failure() {
2348 let proc = Proc::local();
2349 ProcSupervisionCoordinator::set(&proc).await.unwrap();
2352
2353 let root = proc.spawn::<TestActor>("root", ()).await.unwrap();
2354 let root_1 = TestActor::spawn_child(&root).await;
2355 let root_2 = TestActor::spawn_child(&root).await;
2356 let root_2_1 = TestActor::spawn_child(&root_2).await;
2357
2358 root_2
2359 .send(TestActorMessage::Fail(anyhow::anyhow!(
2360 "some random failure"
2361 )))
2362 .unwrap();
2363 let root_2_actor_id = root_2.actor_id().clone();
2364 assert_matches!(
2365 root_2.await,
2366 ActorStatus::Failed(err) if err == format!("serving {}: processing error: some random failure", root_2_actor_id)
2367 );
2368
2369 assert_eq!(
2372 root.await,
2373 ActorStatus::Failed("did not handle supervision event".to_string())
2374 );
2375 assert_eq!(root_2_1.await, ActorStatus::Stopped);
2376 assert_eq!(root_1.await, ActorStatus::Stopped);
2377 }
2378
2379 #[tokio::test]
2380 async fn test_actor_ledger() {
2381 async fn wait_until_idle(actor_handle: &ActorHandle<TestActor>) {
2382 actor_handle
2383 .status()
2384 .wait_for(|state: &ActorStatus| matches!(*state, ActorStatus::Idle))
2385 .await
2386 .unwrap();
2387 }
2388
2389 let proc = Proc::local();
2390
2391 let root: ActorHandle<TestActor> = proc.spawn::<TestActor>("root", ()).await.unwrap();
2393 wait_until_idle(&root).await;
2394 {
2395 let snapshot = proc.state().ledger.snapshot();
2396 assert_eq!(
2397 snapshot.roots,
2398 hashmap! {
2399 root.actor_id().clone() =>
2400 ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string())
2401 },
2402 );
2403 }
2404
2405 let another_root: ActorHandle<TestActor> =
2407 proc.spawn::<TestActor>("another_root", ()).await.unwrap();
2408 wait_until_idle(&another_root).await;
2409 {
2410 let snapshot = proc.state().ledger.snapshot();
2411 assert_eq!(
2412 snapshot.roots,
2413 hashmap! {
2414 root.actor_id().clone() =>
2415 ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string()),
2416 another_root.actor_id().clone() =>
2417 ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string()),
2418 },
2419 );
2420 }
2421
2422 another_root.drain_and_stop().unwrap();
2425 another_root.await;
2426 {
2427 let snapshot = proc.state().ledger.snapshot();
2428 assert_eq!(
2429 snapshot.roots,
2430 hashmap! { root.actor_id().clone() =>
2431 ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string())
2432 },
2433 );
2434 }
2435
2436 let root_1 = TestActor::spawn_child(&root).await;
2442 wait_until_idle(&root_1).await;
2443 {
2444 let snapshot = proc.state().ledger.snapshot();
2445 assert_eq!(
2446 snapshot.roots,
2447 hashmap! {
2448 root.actor_id().clone() => ActorTreeSnapshot {
2449 pid: 0,
2450 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2451 status: ActorStatus::Idle,
2452 stats: ActorStats { num_processed_messages: 1 },
2453 handlers: HashMap::new(),
2454 children: hashmap! {
2455 root_1.actor_id().pid() =>
2456 ActorTreeSnapshot::empty_typed(
2457 root_1.actor_id().pid(),
2458 "hyperactor::proc::tests::TestActor".to_string()
2459 )
2460 },
2461 events: Vec::new(),
2462 spans: Vec::new(),
2463 }
2464 },
2465 );
2466 }
2467
2468 let root_1_1 = TestActor::spawn_child(&root_1).await;
2469 wait_until_idle(&root_1_1).await;
2470 {
2471 let snapshot = proc.state().ledger.snapshot();
2472 assert_eq!(
2473 snapshot.roots,
2474 hashmap! {
2475 root.actor_id().clone() => ActorTreeSnapshot {
2476 pid: 0,
2477 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2478 status: ActorStatus::Idle,
2479 stats: ActorStats { num_processed_messages: 1 },
2480 handlers: HashMap::new(),
2481 children: hashmap!{
2482 root_1.actor_id().pid() =>
2483 ActorTreeSnapshot {
2484 pid: root_1.actor_id().pid(),
2485 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2486 status: ActorStatus::Idle,
2487 stats: ActorStats { num_processed_messages: 1 },
2488 handlers: HashMap::new(),
2489 children: hashmap!{
2490 root_1_1.actor_id().pid() =>
2491 ActorTreeSnapshot::empty_typed(
2492 root_1_1.actor_id().pid(),
2493 "hyperactor::proc::tests::TestActor".to_string()
2494 )
2495 },
2496 events: Vec::new(),
2497 spans: Vec::new(),
2498 }
2499 },
2500 events: Vec::new(),
2501 spans: Vec::new(),
2502 },
2503 }
2504 );
2505 }
2506
2507 let root_2 = TestActor::spawn_child(&root).await;
2508 wait_until_idle(&root_2).await;
2509 {
2510 let snapshot = proc.state().ledger.snapshot();
2511 assert_eq!(
2512 snapshot.roots,
2513 hashmap! {
2514 root.actor_id().clone() => ActorTreeSnapshot {
2515 pid: 0,
2516 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2517 status: ActorStatus::Idle,
2518 stats: ActorStats { num_processed_messages: 2 },
2519 handlers: HashMap::new(),
2520 children: hashmap!{
2521 root_2.actor_id().pid() =>
2522 ActorTreeSnapshot{
2523 pid: root_2.actor_id().pid(),
2524 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2525 status: ActorStatus::Idle,
2526 stats: ActorStats::default(),
2527 handlers: HashMap::new(),
2528 children: HashMap::new(),
2529 events: Vec::new(),
2530 spans: Vec::new(),
2531 },
2532 root_1.actor_id().pid() =>
2533 ActorTreeSnapshot{
2534 pid: root_1.actor_id().pid(),
2535 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2536 status: ActorStatus::Idle,
2537 stats: ActorStats { num_processed_messages: 1 },
2538 handlers: HashMap::new(),
2539 children: hashmap!{
2540 root_1_1.actor_id().pid() =>
2541 ActorTreeSnapshot::empty_typed(
2542 root_1_1.actor_id().pid(),
2543 "hyperactor::proc::tests::TestActor".to_string()
2544 )
2545 },
2546 events: Vec::new(),
2547 spans: Vec::new(),
2548 },
2549 },
2550 events: Vec::new(),
2551 spans: Vec::new(),
2552 },
2553 }
2554 );
2555 }
2556
2557 root_1.drain_and_stop().unwrap();
2559 root_1.await;
2560 {
2561 let snapshot = proc.state().ledger.snapshot();
2562 assert_eq!(
2563 snapshot.roots,
2564 hashmap! {
2565 root.actor_id().clone() => ActorTreeSnapshot {
2566 pid: 0,
2567 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2568 status: ActorStatus::Idle,
2569 stats: ActorStats { num_processed_messages: 3 },
2570 handlers: HashMap::new(),
2571 children: hashmap!{
2572 root_2.actor_id().pid() =>
2573 ActorTreeSnapshot {
2574 pid: root_2.actor_id().pid(),
2575 type_name: "hyperactor::proc::tests::TestActor".to_string(),
2576 status: ActorStatus::Idle,
2577 stats: ActorStats::default(),
2578 handlers: HashMap::new(),
2579 children: HashMap::new(),
2580 events: Vec::new(),
2581 spans: Vec::new(),
2582 }
2583 },
2584 events: Vec::new(),
2585 spans: Vec::new(),
2586 },
2587 }
2588 );
2589 }
2590
2591 root.drain_and_stop().unwrap();
2593 root.await;
2594 {
2595 let snapshot = proc.state().ledger.snapshot();
2596 assert_eq!(snapshot.roots, hashmap! {});
2597 }
2598 }
2599
2600 #[tokio::test]
2601 async fn test_multi_handler() {
2602 #[derive(Debug)]
2606 struct TestActor(Arc<AtomicUsize>);
2607
2608 #[async_trait]
2609 impl Actor for TestActor {
2610 type Params = Arc<AtomicUsize>;
2611
2612 async fn new(param: Arc<AtomicUsize>) -> Result<Self, anyhow::Error> {
2613 Ok(Self(param))
2614 }
2615 }
2616
2617 #[async_trait]
2618 impl Handler<OncePortHandle<PortHandle<usize>>> for TestActor {
2619 async fn handle(
2620 &mut self,
2621 cx: &crate::Context<Self>,
2622 message: OncePortHandle<PortHandle<usize>>,
2623 ) -> anyhow::Result<()> {
2624 message.send(cx.port())?;
2625 Ok(())
2626 }
2627 }
2628
2629 #[async_trait]
2630 impl Handler<usize> for TestActor {
2631 async fn handle(
2632 &mut self,
2633 _cx: &crate::Context<Self>,
2634 message: usize,
2635 ) -> anyhow::Result<()> {
2636 self.0.fetch_add(message, Ordering::SeqCst);
2637 Ok(())
2638 }
2639 }
2640
2641 let proc = Proc::local();
2642 let state = Arc::new(AtomicUsize::new(0));
2643 let handle = proc
2644 .spawn::<TestActor>("test", state.clone())
2645 .await
2646 .unwrap();
2647 let client = proc.attach("client").unwrap();
2648 let (tx, rx) = client.open_once_port();
2649 handle.send(tx).unwrap();
2650 let usize_handle = rx.recv().await.unwrap();
2651 usize_handle.send(123).unwrap();
2652
2653 handle.drain_and_stop().unwrap();
2654 handle.await;
2655
2656 assert_eq!(state.load(Ordering::SeqCst), 123);
2657 }
2658
2659 #[tokio::test]
2660 async fn test_actor_panic() {
2661 panic_handler::set_panic_hook();
2663
2664 let proc = Proc::local();
2665 ProcSupervisionCoordinator::set(&proc).await.unwrap();
2668
2669 let (client, _handle) = proc.instance("client").unwrap();
2670 let actor_handle = proc.spawn::<TestActor>("test", ()).await.unwrap();
2671 actor_handle
2672 .panic(&client, "some random failure".to_string())
2673 .await
2674 .unwrap();
2675 let actor_status = actor_handle.await;
2676
2677 assert_matches!(actor_status, ActorStatus::Failed(_));
2681 if let ActorStatus::Failed(err) = actor_status {
2682 let error_msg = err.to_string();
2683 assert!(error_msg.contains("some random failure"));
2685 assert!(error_msg.contains("rust_begin_unwind"));
2689 }
2690 }
2691
2692 #[tokio::test]
2693 async fn test_local_supervision_propagation() {
2694 #[derive(Debug)]
2695 struct TestActor(Arc<AtomicBool>, bool);
2696
2697 #[async_trait]
2698 impl Actor for TestActor {
2699 type Params = (Arc<AtomicBool>, bool);
2700
2701 async fn new(param: (Arc<AtomicBool>, bool)) -> Result<Self, anyhow::Error> {
2702 Ok(Self(param.0, param.1))
2703 }
2704
2705 async fn handle_supervision_event(
2706 &mut self,
2707 _this: &Instance<Self>,
2708 _event: &ActorSupervisionEvent,
2709 ) -> Result<bool, anyhow::Error> {
2710 if !self.1 {
2711 return Ok(false);
2712 }
2713
2714 tracing::error!(
2715 "{}: supervision event received: {:?}",
2716 _this.self_id(),
2717 _event
2718 );
2719 self.0.store(true, Ordering::SeqCst);
2720 Ok(true)
2721 }
2722 }
2723
2724 #[async_trait]
2725 impl Handler<String> for TestActor {
2726 async fn handle(
2727 &mut self,
2728 cx: &crate::Context<Self>,
2729 message: String,
2730 ) -> anyhow::Result<()> {
2731 tracing::info!("{} received message: {}", cx.self_id(), message);
2732 Err(anyhow::anyhow!(message))
2733 }
2734 }
2735
2736 let proc = Proc::local();
2737 let reported_event = ProcSupervisionCoordinator::set(&proc).await.unwrap();
2738
2739 let root_state = Arc::new(AtomicBool::new(false));
2740 let root_1_state = Arc::new(AtomicBool::new(false));
2741 let root_1_1_state = Arc::new(AtomicBool::new(false));
2742 let root_1_1_1_state = Arc::new(AtomicBool::new(false));
2743 let root_2_state = Arc::new(AtomicBool::new(false));
2744 let root_2_1_state = Arc::new(AtomicBool::new(false));
2745
2746 let root = proc
2747 .spawn::<TestActor>("root", (root_state.clone(), false))
2748 .await
2749 .unwrap();
2750 let root_1 = proc
2751 .spawn_child::<TestActor>(
2752 root.cell().clone(),
2753 (
2754 root_1_state.clone(),
2755 true, ),
2757 )
2758 .await
2759 .unwrap();
2760 let root_1_1 = proc
2761 .spawn_child::<TestActor>(root_1.cell().clone(), (root_1_1_state.clone(), false))
2762 .await
2763 .unwrap();
2764 let root_1_1_1 = proc
2765 .spawn_child::<TestActor>(root_1_1.cell().clone(), (root_1_1_1_state.clone(), false))
2766 .await
2767 .unwrap();
2768 let root_2 = proc
2769 .spawn_child::<TestActor>(root.cell().clone(), (root_2_state.clone(), false))
2770 .await
2771 .unwrap();
2772 let root_2_1 = proc
2773 .spawn_child::<TestActor>(root_2.cell().clone(), (root_2_1_state.clone(), false))
2774 .await
2775 .unwrap();
2776
2777 root_1_1_1
2780 .send::<String>("some random failure".into())
2781 .unwrap();
2782
2783 root_2_1
2786 .send::<String>("some random failure".into())
2787 .unwrap();
2788
2789 RealClock.sleep(Duration::from_secs(1)).await;
2790
2791 assert!(!root_state.load(Ordering::SeqCst));
2792 assert!(root_1_state.load(Ordering::SeqCst));
2793 assert!(!root_1_1_state.load(Ordering::SeqCst));
2794 assert!(!root_1_1_1_state.load(Ordering::SeqCst));
2795 assert!(!root_2_state.load(Ordering::SeqCst));
2796 assert!(!root_2_1_state.load(Ordering::SeqCst));
2797 assert_eq!(
2798 reported_event.event().map(|e| e.actor_id.clone()),
2799 Some(root.actor_id().clone())
2800 );
2801 }
2802
2803 #[tokio::test]
2804 async fn test_supervision_event_handler_propagates() {
2805 #[derive(Debug)]
2806 struct FailingSupervisionActor;
2807
2808 #[async_trait]
2809 impl Actor for FailingSupervisionActor {
2810 type Params = ();
2811
2812 async fn new(_: ()) -> Result<Self, anyhow::Error> {
2813 Ok(Self)
2814 }
2815
2816 async fn handle_supervision_event(
2817 &mut self,
2818 _this: &Instance<Self>,
2819 _event: &ActorSupervisionEvent,
2820 ) -> Result<bool, anyhow::Error> {
2821 anyhow::bail!("failed to handle supervision event!")
2822 }
2823 }
2824
2825 #[async_trait]
2826 impl Handler<String> for FailingSupervisionActor {
2827 async fn handle(
2828 &mut self,
2829 _cx: &crate::Context<Self>,
2830 message: String,
2831 ) -> anyhow::Result<()> {
2832 Err(anyhow::anyhow!(message))
2833 }
2834 }
2835
2836 #[derive(Debug)]
2837 struct ParentActor(tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>);
2838
2839 #[async_trait]
2840 impl Actor for ParentActor {
2841 type Params = tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>;
2842
2843 async fn new(
2844 supervision_events: tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>,
2845 ) -> Result<Self, anyhow::Error> {
2846 Ok(Self(supervision_events))
2847 }
2848
2849 async fn handle_supervision_event(
2850 &mut self,
2851 _this: &Instance<Self>,
2852 event: &ActorSupervisionEvent,
2853 ) -> Result<bool, anyhow::Error> {
2854 self.0.send(event.clone()).unwrap();
2855 Ok(true)
2856 }
2857 }
2858
2859 let proc = Proc::local();
2860
2861 let (event_tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel();
2862
2863 let parent = proc.spawn::<ParentActor>("parent", event_tx).await.unwrap();
2864 let child = proc
2865 .spawn_child::<FailingSupervisionActor>(parent.cell().clone(), ())
2866 .await
2867 .unwrap();
2868 let grandchild = proc
2869 .spawn_child::<FailingSupervisionActor>(child.cell().clone(), ())
2870 .await
2871 .unwrap();
2872
2873 let child_actor_id = child.actor_id().clone();
2874 let grandchild_actor_id = grandchild.actor_id().clone();
2875
2876 grandchild.send("trigger failure".to_string()).unwrap();
2879
2880 assert!(grandchild.await.is_failed());
2881 assert!(child.await.is_failed());
2882
2883 assert_eq!(
2884 event_rx.recv().await.unwrap(),
2885 ActorSupervisionEvent::new(
2887 child_actor_id,
2888 ActorStatus::Failed(
2889 "failed to handle supervision event: failed to handle supervision event!"
2890 .to_string(),
2891 ),
2892 None,
2893 Some(Box::new(ActorSupervisionEvent::new(
2894 grandchild_actor_id,
2895 ActorStatus::Failed(
2896 "serving local[0].parent[2]: processing error: trigger failure".to_string()
2897 ),
2898 None,
2899 None,
2900 ))),
2901 )
2902 );
2903
2904 assert!(event_rx.try_recv().is_err());
2905 }
2906
2907 #[tokio::test]
2908 async fn test_instance() {
2909 #[derive(Debug, Default, Actor)]
2910 struct TestActor;
2911
2912 #[async_trait]
2913 impl Handler<(String, PortRef<String>)> for TestActor {
2914 async fn handle(
2915 &mut self,
2916 cx: &crate::Context<Self>,
2917 (message, port): (String, PortRef<String>),
2918 ) -> anyhow::Result<()> {
2919 port.send(cx, message)?;
2920 Ok(())
2921 }
2922 }
2923
2924 let proc = Proc::local();
2925
2926 let (instance, handle) = proc.instance("my_test_actor").unwrap();
2927
2928 let child_actor = TestActor::spawn(&instance, ()).await.unwrap();
2929
2930 let (port, mut receiver) = instance.open_port();
2931 child_actor
2932 .send(("hello".to_string(), port.bind()))
2933 .unwrap();
2934
2935 let message = receiver.recv().await.unwrap();
2936 assert_eq!(message, "hello");
2937
2938 child_actor.drain_and_stop().unwrap();
2939 child_actor.await;
2940
2941 assert_eq!(*handle.status().borrow(), ActorStatus::Client);
2942 drop(instance);
2943 assert_eq!(*handle.status().borrow(), ActorStatus::Stopped);
2944 handle.await;
2945 }
2946
2947 #[tokio::test]
2948 async fn test_proc_terminate_without_coordinator() {
2949 if std::env::var("CARGO_TEST").is_ok() {
2950 eprintln!("test skipped as it hangs when run by cargo in sandcastle");
2951 return;
2952 }
2953
2954 let process = async {
2955 let proc = Proc::local();
2956 let root = proc.spawn::<TestActor>("root", ()).await.unwrap();
2960 let (client, _handle) = proc.instance("client").unwrap();
2961 root.fail(&client, anyhow::anyhow!("some random failure"))
2962 .await
2963 .unwrap();
2964 RealClock.sleep(Duration::from_secs(30)).await;
2968 };
2969
2970 assert_termination(|| process, 1).await.unwrap();
2971 }
2972
2973 fn trace_and_block(fut: impl Future) {
2974 tracing::subscriber::with_default(
2975 tracing_subscriber::Registry::default().with(hyperactor_telemetry::recorder().layer()),
2976 || {
2977 tokio::runtime::Builder::new_current_thread()
2978 .enable_all()
2979 .build()
2980 .unwrap()
2981 .block_on(fut)
2982 },
2983 );
2984 }
2985
2986 #[ignore = "until trace recording is turned back on"]
2987 #[test]
2988 fn test_handler_logging() {
2989 #[derive(Debug, Default, Actor)]
2990 struct LoggingActor;
2991
2992 impl LoggingActor {
2993 async fn wait(handle: &ActorHandle<Self>) {
2994 let barrier = Arc::new(Barrier::new(2));
2995 handle.send(barrier.clone()).unwrap();
2996 barrier.wait().await;
2997 }
2998 }
2999
3000 #[async_trait]
3001 impl Handler<String> for LoggingActor {
3002 async fn handle(
3003 &mut self,
3004 _cx: &crate::Context<Self>,
3005 message: String,
3006 ) -> anyhow::Result<()> {
3007 tracing::info!("{}", message);
3008 Ok(())
3009 }
3010 }
3011
3012 #[async_trait]
3013 impl Handler<u64> for LoggingActor {
3014 async fn handle(
3015 &mut self,
3016 _cx: &crate::Context<Self>,
3017 message: u64,
3018 ) -> anyhow::Result<()> {
3019 tracing::event!(Level::INFO, number = message);
3020 Ok(())
3021 }
3022 }
3023
3024 #[async_trait]
3025 impl Handler<Arc<Barrier>> for LoggingActor {
3026 async fn handle(
3027 &mut self,
3028 _cx: &crate::Context<Self>,
3029 message: Arc<Barrier>,
3030 ) -> anyhow::Result<()> {
3031 message.wait().await;
3032 Ok(())
3033 }
3034 }
3035
3036 #[async_trait]
3037 impl Handler<Arc<(Barrier, Barrier)>> for LoggingActor {
3038 async fn handle(
3039 &mut self,
3040 _cx: &crate::Context<Self>,
3041 barriers: Arc<(Barrier, Barrier)>,
3042 ) -> anyhow::Result<()> {
3043 let inner = tracing::span!(Level::INFO, "child_span");
3044 let _inner_guard = inner.enter();
3045 barriers.0.wait().await;
3046 barriers.1.wait().await;
3047 Ok(())
3048 }
3049 }
3050
3051 trace_and_block(async {
3052 let handle = LoggingActor::spawn_detached(()).await.unwrap();
3053 handle.send("hello world".to_string()).unwrap();
3054 handle.send("hello world again".to_string()).unwrap();
3055 handle.send(123u64).unwrap();
3056
3057 LoggingActor::wait(&handle).await;
3058
3059 let events = handle.cell().inner.recording.tail();
3060 assert_eq!(events.len(), 3);
3061 assert_eq!(events[0].json_value(), json!({ "message": "hello world" }));
3062 assert_eq!(
3063 events[1].json_value(),
3064 json!({ "message": "hello world again" })
3065 );
3066 assert_eq!(events[2].json_value(), json!({ "number": 123 }));
3067
3068 let stacks = {
3069 let barriers = Arc::new((Barrier::new(2), Barrier::new(2)));
3070 handle.send(Arc::clone(&barriers)).unwrap();
3071 barriers.0.wait().await;
3072 let stacks = handle.cell().inner.recording.stacks();
3073 barriers.1.wait().await;
3074 stacks
3075 };
3076 assert_eq!(stacks.len(), 1);
3077 assert_eq!(stacks[0].len(), 1);
3078 assert_eq!(stacks[0][0].name(), "child_span");
3079 })
3080 }
3081}