1use std::any::Any;
90use std::any::TypeId;
91use std::collections::BTreeMap;
92use std::collections::HashMap;
93use std::fmt;
94use std::future::Future;
95use std::ops::Deref;
96use std::panic;
97use std::panic::AssertUnwindSafe;
98use std::panic::Location as PanicLocation;
99use std::pin::Pin;
100use std::sync::Arc;
101use std::sync::Condvar;
102use std::sync::Mutex;
103use std::sync::OnceLock;
104use std::sync::RwLock;
105use std::sync::Weak;
106use std::sync::atomic::AtomicBool;
107use std::sync::atomic::AtomicU64;
108use std::sync::atomic::AtomicUsize;
109use std::sync::atomic::Ordering;
110use std::time::Duration;
111use std::time::Instant;
112use std::time::SystemTime;
113
114use async_trait::async_trait;
115use dashmap::DashMap;
116use dashmap::DashSet;
117use dashmap::mapref::entry::Entry;
118use dashmap::mapref::multiple::RefMulti;
119use futures::FutureExt;
120use hyperactor_config::Flattrs;
121use hyperactor_telemetry::ActorStatusEvent;
122use hyperactor_telemetry::generate_actor_status_event_id;
123use hyperactor_telemetry::hash_to_u64;
124use hyperactor_telemetry::notify_actor_status_changed;
125use hyperactor_telemetry::notify_message;
126use hyperactor_telemetry::notify_message_status;
127use hyperactor_telemetry::recorder::Recording;
128use serde::Deserialize;
129use serde::Serialize;
130use tokio::sync::Notify;
131use tokio::sync::mpsc;
132use tokio::sync::watch;
133use tokio::task::JoinHandle;
134use tracing::Instrument;
135use tracing::Span;
136use typeuri::Named;
137use uuid::Uuid;
138use wirevalue::TypeInfo;
139
140use crate as hyperactor;
141use crate::Actor;
142use crate::ActorAddr;
143use crate::ActorRef;
144use crate::Addr;
145use crate::Data;
146use crate::Handler;
147use crate::Location;
148use crate::Message;
149use crate::PortAddr;
150use crate::ProcAddr;
151use crate::ProcId;
152use crate::RemoteMessage;
153use crate::actor::ActorError;
154use crate::actor::ActorErrorKind;
155use crate::actor::ActorHandle;
156use crate::actor::ActorStatus;
157use crate::actor::AnyActorHandle;
158use crate::actor::Binds;
159use crate::actor::HandlerInfo;
160use crate::actor::Referable;
161use crate::actor::RemoteHandles;
162use crate::actor::Signal;
163use crate::actor::StopMode;
164use crate::actor_local::ActorLocalStorage;
165use crate::channel;
166use crate::channel::ChannelAddr;
167use crate::channel::ChannelError;
168use crate::channel::ChannelTransport;
169use crate::config;
170use crate::context;
171use crate::context::Mailbox as _;
172use crate::endpoint::Endpoint as _;
173use crate::gateway::Gateway;
174use crate::id::ActorId;
175use crate::id::Label;
176use crate::id::Uid;
177use crate::introspect::IntrospectMessage;
178use crate::introspect::IntrospectResult;
179use crate::mailbox::BoxedMailboxSender;
180use crate::mailbox::DeliveryError;
181use crate::mailbox::DialMailboxRouter;
182use crate::mailbox::IntoBoxedMailboxSender as _;
183use crate::mailbox::Mailbox;
184use crate::mailbox::MailboxClient;
185use crate::mailbox::MailboxMuxer;
186use crate::mailbox::MailboxSender;
187use crate::mailbox::MessageEnvelope;
188use crate::mailbox::OncePortHandle;
189use crate::mailbox::OncePortReceiver;
190use crate::mailbox::PanickingMailboxSender;
191use crate::mailbox::PortHandle;
192use crate::mailbox::PortReceiver;
193use crate::mailbox::Undeliverable;
194use crate::metrics::ACTOR_MESSAGE_HANDLER_DURATION;
195use crate::metrics::ACTOR_MESSAGE_QUEUE_SIZE;
196use crate::metrics::ACTOR_MESSAGES_RECEIVED;
197use crate::subject::AsSubject as _;
198
199pub const LEGACY_LOCAL_PROC_NAME: &str = "local";
204
205pub const LEGACY_SERVICE_PROC_NAME: &str = "service";
210
211fn wall_clock_epoch_ms() -> u64 {
215 std::time::SystemTime::now()
216 .duration_since(std::time::UNIX_EPOCH)
217 .unwrap_or_default()
218 .as_millis() as u64
219}
220
221pub(crate) struct ProcQueueStats {
228 running_total: AtomicU64,
232 high_water_mark: AtomicU64,
234 last_nonzero_epoch_ms: AtomicU64,
240 clock: fn() -> u64,
243}
244
245impl ProcQueueStats {
246 fn new() -> Self {
247 Self {
248 running_total: AtomicU64::new(0),
249 high_water_mark: AtomicU64::new(0),
250 last_nonzero_epoch_ms: AtomicU64::new(0),
251 clock: wall_clock_epoch_ms,
252 }
253 }
254
255 #[cfg(test)]
257 fn with_clock(clock: fn() -> u64) -> Self {
258 Self {
259 running_total: AtomicU64::new(0),
260 high_water_mark: AtomicU64::new(0),
261 last_nonzero_epoch_ms: AtomicU64::new(0),
262 clock,
263 }
264 }
265
266 fn now_ms(&self) -> u64 {
268 (self.clock)()
269 }
270
271 pub(crate) fn running_total(&self) -> u64 {
273 self.running_total.load(Ordering::Relaxed)
274 }
275
276 pub(crate) fn high_water_mark(&self) -> u64 {
278 self.high_water_mark.load(Ordering::Relaxed)
279 }
280
281 pub(crate) fn last_nonzero_age_ms(&self) -> Option<u64> {
286 let ts = self.last_nonzero_epoch_ms.load(Ordering::Relaxed);
287 if ts == 0 {
288 return None;
289 }
290 Some(self.now_ms().saturating_sub(ts))
291 }
292}
293
294fn account_enqueue(queue_depth: &AtomicU64, proc_stats: &ProcQueueStats, actor_id: &str) {
301 queue_depth.fetch_add(1, Ordering::Relaxed);
302 let new_total = proc_stats.running_total.fetch_add(1, Ordering::Relaxed) + 1;
303 proc_stats
305 .high_water_mark
306 .fetch_max(new_total, Ordering::Relaxed);
307 proc_stats
309 .last_nonzero_epoch_ms
310 .store(proc_stats.now_ms(), Ordering::Relaxed);
311 ACTOR_MESSAGE_QUEUE_SIZE.add(
312 1,
313 hyperactor_telemetry::kv_pairs!("actor_id" => actor_id.to_owned()),
314 );
315}
316
317fn account_dequeue(queue_depth: &AtomicU64, proc_stats: &ProcQueueStats, actor_id: &str) {
324 queue_depth.fetch_sub(1, Ordering::Relaxed);
325 let prev_total = proc_stats.running_total.fetch_sub(1, Ordering::Relaxed);
326 if prev_total > 1 {
330 proc_stats
331 .last_nonzero_epoch_ms
332 .store(proc_stats.now_ms(), Ordering::Relaxed);
333 }
334 ACTOR_MESSAGE_QUEUE_SIZE.add(
335 -1,
336 hyperactor_telemetry::kv_pairs!("actor_id" => actor_id.to_owned()),
337 );
338}
339
340fn account_cancel_enqueue(queue_depth: &AtomicU64, proc_stats: &ProcQueueStats, actor_id: &str) {
350 queue_depth.fetch_sub(1, Ordering::Relaxed);
351 proc_stats.running_total.fetch_sub(1, Ordering::Relaxed);
352 ACTOR_MESSAGE_QUEUE_SIZE.add(
353 -1,
354 hyperactor_telemetry::kv_pairs!("actor_id" => actor_id.to_owned()),
355 );
356}
357use crate::ordering::OrderedSender;
358use crate::ordering::OrderedSenderError;
359use crate::ordering::SEQ_INFO;
360use crate::ordering::SeqInfo;
361use crate::ordering::Sequencer;
362use crate::ordering::ordered_channel;
363use crate::panic_handler;
364use crate::supervision::ActorSupervisionEvent;
365
366#[derive(Debug, Clone, Serialize, Deserialize, typeuri::Named)]
369pub struct BootstrapAssignment {
370 pub proc_id: ProcAddr,
372}
373wirevalue::register_type!(BootstrapAssignment);
374
375#[derive(Debug, Clone, Serialize, Deserialize, typeuri::Named)]
379pub struct AttachRequest;
380wirevalue::register_type!(AttachRequest);
381
382#[derive(Debug, Serialize, Deserialize, typeuri::Named)]
385#[expect(
386 clippy::large_enum_variant,
387 reason = "wire-protocol enum; boxing Envelope ripples through all channel/networking construction and destructure sites and needs a wire-compatibility review — separate diff"
388)]
389pub enum Host2Client {
390 Bootstrap(BootstrapAssignment),
392 Envelope(MessageEnvelope),
394}
395wirevalue::register_type!(Host2Client);
396
397pub struct AttachRx(pub channel::duplex::DuplexRx<Host2Client>);
400
401#[async_trait]
402impl channel::Rx<MessageEnvelope> for AttachRx {
403 async fn recv(&mut self) -> Result<MessageEnvelope, ChannelError> {
404 match self.0.recv().await? {
405 Host2Client::Envelope(envelope) => Ok(envelope),
406 Host2Client::Bootstrap(_) => Err(ChannelError::Other(anyhow::anyhow!(
407 "unexpected bootstrap message after handshake"
408 ))),
409 }
410 }
411
412 fn addr(&self) -> ChannelAddr {
413 self.0.addr()
414 }
415
416 async fn join(self) {
417 self.0.join().await
418 }
419}
420
421#[derive(Clone)]
428pub struct Proc {
429 inner: Arc<ProcState>,
430}
431
432impl fmt::Debug for Proc {
433 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
434 f.debug_struct("Proc")
435 .field("proc_id", &self.inner.proc_id)
436 .finish()
437 }
438}
439
440struct ProcState {
441 proc_id: ProcId,
444
445 gateway: Gateway,
447
448 proc_muxer: MailboxMuxer,
451
452 reserved_roots: DashSet<crate::id::Uid>,
456
457 reserved_child_uids: DashSet<crate::id::Uid>,
460
461 instances: DashMap<ActorId, WeakInstanceCell>,
463
464 queue_stats: Arc<ProcQueueStats>,
469
470 terminated_snapshots: DashMap<ActorId, TerminatedSnapshot>,
475
476 supervision_coordinator_port: OnceLock<PortHandle<ActorSupervisionEvent>>,
479
480 supervision_coordinator_actor_id: OnceLock<ActorAddr>,
483
484 mailbox_server_handle: std::sync::Mutex<Option<crate::mailbox::MailboxServerHandle>>,
489}
490
491struct TerminatedSnapshot {
492 actor_addr: ActorAddr,
493 payload: crate::introspect::IntrospectResult,
494}
495
496impl Drop for ProcState {
497 fn drop(&mut self) {
498 let proc_addr = self.proc_addr();
502 tracing::info!(
503 subject = %proc_addr.subject(),
504 name = "ProcStatus",
505 status = "Dropped"
506 );
507 }
508}
509
510impl ProcState {
511 fn default_location(&self) -> Location {
512 self.gateway.default_location()
513 }
514
515 fn set_default_location(&self, location: Location) {
516 self.gateway.set_default_location(location)
517 }
518
519 fn proc_addr(&self) -> ProcAddr {
520 self.gateway.proc_addr(&self.proc_id)
521 }
522}
523
524pub struct ActorInstance<A: Actor> {
529 pub instance: Instance<A>,
531 pub handle: ActorHandle<A>,
533 pub supervision: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
535 pub signal: PortReceiver<Signal>,
537 pub work: mpsc::UnboundedReceiver<WorkCell<A>>,
539}
540
541pub struct Builder<State = GlobalGateway> {
543 proc_id: Option<ProcId>,
544 state: State,
545}
546
547pub struct GlobalGateway;
549
550pub struct SharedGateway {
552 gateway: Gateway,
553}
554
555pub struct PrivateGateway {
557 forwarder: BoxedMailboxSender,
558}
559
560impl Builder<GlobalGateway> {
561 pub fn new() -> Self {
563 Self {
564 proc_id: None,
565 state: GlobalGateway,
566 }
567 }
568
569 pub fn shared_gateway(self, gateway: Gateway) -> Builder<SharedGateway> {
571 Builder {
572 proc_id: self.proc_id,
573 state: SharedGateway { gateway },
574 }
575 }
576
577 pub fn private_gateway(self, forwarder: BoxedMailboxSender) -> Builder<PrivateGateway> {
579 Builder {
580 proc_id: self.proc_id,
581 state: PrivateGateway { forwarder },
582 }
583 }
584
585 pub fn build(self) -> Result<Proc, anyhow::Error> {
587 self.build_with_gateway(Gateway::global().clone())
588 }
589}
590
591impl<State> Builder<State> {
592 pub fn proc_id(mut self, proc_id: ProcId) -> Self {
594 self.proc_id = Some(proc_id);
595 self
596 }
597
598 fn build_with_gateway(self, gateway: Gateway) -> Result<Proc, anyhow::Error> {
599 Self::build_proc(self.proc_id, gateway)
600 }
601
602 fn build_proc(proc_id: Option<ProcId>, gateway: Gateway) -> Result<Proc, anyhow::Error> {
603 let proc_id = proc_id.unwrap_or_else(ProcId::anonymous);
604 if is_legacy_pseudo_singleton_proc_id(&proc_id) {
605 anyhow::bail!(
606 "legacy pseudo-singleton proc id '{}' must be constructed with a dedicated Proc constructor",
607 proc_id
608 );
609 }
610 Ok(Proc::from_parts_unchecked(proc_id, gateway))
611 }
612}
613
614impl Builder<SharedGateway> {
615 pub fn build(self) -> Result<Proc, anyhow::Error> {
617 let Builder {
618 proc_id,
619 state: SharedGateway { gateway },
620 } = self;
621 Self::build_proc(proc_id, gateway)
622 }
623}
624
625impl Builder<PrivateGateway> {
626 pub fn build(self) -> Result<Proc, anyhow::Error> {
628 let Builder {
629 proc_id,
630 state: PrivateGateway { forwarder },
631 } = self;
632 let gateway = Gateway::configured(channel::reserve_local_addr().into(), forwarder);
633 Self::build_proc(proc_id, gateway)
634 }
635}
636
637impl Proc {
638 fn from_parts_unchecked(proc_id: ProcId, gateway: Gateway) -> Self {
639 let proc_addr = gateway.proc_addr(&proc_id);
640 tracing::info!(
641 subject = %proc_addr.subject(),
642 name = "ProcStatus",
643 status = "Created"
644 );
645
646 let proc = Self {
647 inner: Arc::new(ProcState {
648 proc_id: proc_id.clone(),
649 gateway: gateway.clone(),
650 proc_muxer: MailboxMuxer::new(),
651 reserved_roots: DashSet::new(),
652 reserved_child_uids: DashSet::new(),
653 instances: DashMap::new(),
654 queue_stats: Arc::new(ProcQueueStats::new()),
655 terminated_snapshots: DashMap::new(),
656 supervision_coordinator_port: OnceLock::new(),
657 supervision_coordinator_actor_id: OnceLock::new(),
658 mailbox_server_handle: std::sync::Mutex::new(None),
659 }),
660 };
661 gateway.attach(&proc);
662 proc
663 }
664
665 fn from_parts(proc_id: ProcId, gateway: Gateway) -> Self {
666 assert_not_legacy_pseudo_singleton_proc_id(&proc_id);
667 Self::from_parts_unchecked(proc_id, gateway)
668 }
669
670 pub fn legacy_local_pseudo_singleton(addr: ChannelAddr, forwarder: BoxedMailboxSender) -> Self {
672 Self::legacy_pseudo_singleton(addr, LEGACY_LOCAL_PROC_NAME, forwarder)
673 }
674
675 pub fn legacy_service_pseudo_singleton(
677 addr: ChannelAddr,
678 forwarder: BoxedMailboxSender,
679 ) -> Self {
680 Self::legacy_pseudo_singleton(addr, LEGACY_SERVICE_PROC_NAME, forwarder)
681 }
682
683 fn legacy_pseudo_singleton(
684 addr: ChannelAddr,
685 name: &'static str,
686 forwarder: BoxedMailboxSender,
687 ) -> Self {
688 let proc_addr = ProcAddr::singleton(addr, name);
689 Self::from_parts_unchecked(
690 proc_addr.id().clone(),
691 Gateway::configured(proc_addr.location().clone(), forwarder),
692 )
693 }
694
695 pub fn anonymous() -> Self {
697 Self::builder()
698 .build()
699 .expect("anonymous proc builder is valid")
700 }
701
702 pub fn instance(label: impl AsRef<str>) -> Self {
704 Self::builder()
705 .proc_id(ProcId::instance(Label::strip(label.as_ref())))
706 .build()
707 .expect("instance proc builder is valid")
708 }
709
710 pub fn singleton(name: impl AsRef<str>) -> Self {
712 Self::builder()
713 .proc_id(ProcId::singleton(Label::strip(name.as_ref())))
714 .build()
715 .expect("singleton proc builder is valid")
716 }
717
718 pub fn isolated() -> Self {
720 Self::builder()
721 .shared_gateway(Gateway::isolated())
722 .build()
723 .expect("isolated proc builder is valid")
724 }
725
726 pub fn builder() -> Builder {
728 Builder::new()
729 }
730
731 pub fn configured(proc_id: impl Into<ProcAddr>, forwarder: BoxedMailboxSender) -> Self {
733 let proc_addr = proc_id.into();
734 Self::from_parts(
735 proc_addr.id().clone(),
736 Gateway::configured(proc_addr.location().clone(), forwarder),
737 )
738 }
739
740 pub fn direct(addr: ChannelAddr, name: String) -> Result<Self, ChannelError> {
745 let (addr, rx) = channel::serve(addr)?;
746 let proc_id = ProcAddr::instance(addr, name);
747 let proc = Self::builder()
748 .proc_id(proc_id.id().clone())
749 .shared_gateway(Gateway::configured(
750 proc_id.location().clone(),
751 DialMailboxRouter::new().into_boxed(),
752 ))
753 .build()
754 .expect("direct proc builder is valid");
755 let handle = proc.gateway().serve_rx(rx);
756 *proc.inner.mailbox_server_handle.lock().unwrap() = Some(handle);
757 Ok(proc)
758 }
759
760 pub async fn attach_to_host(addr: ChannelAddr) -> Result<Self, anyhow::Error> {
766 use crate::channel::Rx;
767 use crate::channel::Tx;
768 let mut duplex_client = channel::duplex::dial::<MessageEnvelope, Host2Client>(addr)?;
769 let duplex_tx = duplex_client.tx();
770 let mut duplex_rx = duplex_client
771 .take_rx()
772 .expect("dial returns a fresh DuplexClient with rx present");
773 let signal_actor_id = ActorAddr::root(
782 ProcAddr::singleton(ChannelAddr::any(channel::ChannelTransport::Local), "attach"),
783 crate::id::Label::strip("attach"),
784 );
785 let signal_port = signal_actor_id.port_addr(crate::port::Port::from(0u64));
786 let mut envelope = MessageEnvelope::serialize(
787 signal_actor_id,
788 signal_port,
789 &AttachRequest,
790 Default::default(),
791 )?;
792 envelope.set_return_undeliverable(false);
793 duplex_tx.post(envelope);
794 let assignment = match duplex_rx.recv().await? {
796 Host2Client::Bootstrap(a) => a,
797 Host2Client::Envelope(_) => {
798 anyhow::bail!("expected bootstrap assignment as first message")
799 }
800 };
801 let proc = Self::builder()
802 .proc_id(assignment.proc_id.id().clone())
803 .shared_gateway(Gateway::configured(
804 assignment.proc_id.location().clone(),
805 MailboxClient::new(duplex_tx).into_boxed(),
806 ))
807 .build()
808 .expect("attached proc builder is valid");
809 let inner_handle = proc.gateway().serve_rx(AttachRx(duplex_rx));
813 let (stopped_tx, mut stopped_rx) = tokio::sync::watch::channel(false);
814 let wrapped_join = tokio::spawn(async move {
815 let _ = stopped_rx.wait_for(|stopped| *stopped).await;
816 inner_handle.stop("proc shutting down");
817 let _ = inner_handle.await;
818 duplex_client.join().await;
819 Ok(())
820 });
821 let handle = crate::mailbox::MailboxServerHandle::from_parts(wrapped_join, stopped_tx);
822 *proc.inner.mailbox_server_handle.lock().unwrap() = Some(handle);
823 Ok(proc)
824 }
825
826 pub fn set_supervision_coordinator(
829 &self,
830 port: PortHandle<ActorSupervisionEvent>,
831 ) -> Result<(), anyhow::Error> {
832 let actor_ref: ActorAddr = port.location().actor_addr();
833 self.state()
834 .supervision_coordinator_port
835 .set(port)
836 .map_err(|existing| anyhow::anyhow!("coordinator port is already set to {existing}"))?;
837 let _ = self.state().supervision_coordinator_actor_id.set(actor_ref);
838 Ok(())
839 }
840
841 pub fn supervision_coordinator_actor_addr(&self) -> Option<&ActorAddr> {
844 self.state().supervision_coordinator_actor_id.get()
845 }
846
847 pub fn handle_unhandled_supervision_event(
850 &self,
851 cx: &impl context::Actor,
852 event: ActorSupervisionEvent,
853 ) {
854 let result = match self.state().supervision_coordinator_port.get() {
855 Some(port) => {
856 port.post(cx, event.clone());
857 Ok(())
858 }
859 None => {
860 if !event.is_error() {
861 return;
864 }
865 Err(anyhow::anyhow!(
866 "coordinator port is not set for proc {}",
867 self.proc_addr(),
868 ))
869 }
870 };
871 if let Err(err) = result {
872 if !event.is_error() {
873 tracing::debug!(
876 subject = %self.proc_addr().subject(),
877 "dropping non-error supervision event {}: {:?}",
878 event,
879 err
880 );
881 return;
882 }
883 tracing::error!(
884 subject = %self.proc_addr().subject(),
885 "could not propagate supervision event {} due to error: {:?}: crashing",
886 event,
887 err
888 );
889
890 std::process::exit(1);
891 }
892 }
893
894 pub fn proc_id(&self) -> &ProcId {
896 &self.state().proc_id
897 }
898
899 pub fn default_location(&self) -> Location {
901 self.state().default_location()
902 }
903
904 pub fn set_default_location(&self, location: Location) {
906 self.state().set_default_location(location)
907 }
908
909 pub fn proc_addr(&self) -> ProcAddr {
911 self.state().proc_addr()
912 }
913
914 pub fn gateway(&self) -> Gateway {
916 self.state().gateway.clone()
917 }
918
919 pub fn forwarder(&self) -> &BoxedMailboxSender {
922 self.state().gateway.forwarder()
923 }
924
925 pub fn muxer(&self) -> &MailboxMuxer {
928 &self.inner.proc_muxer
929 }
930
931 fn state(&self) -> &ProcState {
933 self.inner.as_ref()
934 }
935
936 pub(crate) fn runtime() -> &'static Proc {
938 static RUNTIME_PROC: OnceLock<Proc> = OnceLock::new();
939 RUNTIME_PROC.get_or_init(|| {
940 let addr = ChannelAddr::any(ChannelTransport::Local);
941 let proc_id = ProcAddr::instance(addr, "hyperactor_runtime");
942 Proc::configured(proc_id, BoxedMailboxSender::new(PanickingMailboxSender))
943 })
944 }
945
946 pub fn attach(&self, name: &str) -> Result<Mailbox, anyhow::Error> {
948 let actor_id: ActorAddr = self.allocate_root_id(name)?;
949 Ok(self.bind_mailbox(actor_id))
950 }
951
952 pub fn attach_child(&self, parent_id: &ActorAddr) -> Result<Mailbox, anyhow::Error> {
954 let actor_id: ActorAddr = self.allocate_child_id(parent_id)?;
955 Ok(self.bind_mailbox(actor_id))
956 }
957
958 fn bind_mailbox(&self, actor_id: ActorAddr) -> Mailbox {
960 let mbox = Mailbox::new(actor_id);
961
962 self.state().proc_muxer.bind_mailbox(mbox.clone());
965 mbox
966 }
967
968 pub fn attach_actor<R, M>(
971 &self,
972 name: &str,
973 ) -> Result<(Instance<()>, ActorRef<R>, PortReceiver<M>), anyhow::Error>
974 where
975 M: RemoteMessage,
976 R: Referable + RemoteHandles<M>,
977 {
978 let (instance, _handle) = self.client(name)?;
979 let (_handle, rx) = instance.bind_handler_port::<M>();
980 let actor_ref = ActorRef::attest(instance.self_addr().clone());
981 Ok((instance, actor_ref, rx))
982 }
983
984 pub fn spawn<A: Actor>(&self, name: &str, actor: A) -> Result<ActorHandle<A>, anyhow::Error> {
987 let actor_id: ActorAddr = self.allocate_root_id(name)?;
988 self.spawn_inner(actor_id, actor, None)
989 }
990
991 pub fn spawn_with_uid<A: Actor>(
996 &self,
997 uid: crate::id::Uid,
998 actor: A,
999 ) -> Result<ActorHandle<A>, anyhow::Error> {
1000 let actor_id: ActorAddr = self.allocate_root_uid(uid)?;
1001 self.spawn_inner(actor_id, actor, None)
1002 }
1003
1004 #[hyperactor::instrument(fields(subject = actor_id.subject().to_string()))]
1006 fn spawn_inner<A: Actor>(
1007 &self,
1008 actor_id: ActorAddr,
1009 actor: A,
1010 parent: Option<InstanceCell>,
1011 ) -> Result<ActorHandle<A>, anyhow::Error> {
1012 let (instance, receivers) = Instance::new(self.clone(), actor_id, false, parent);
1013 Ok(instance.start(actor, receivers))
1014 }
1015
1016 pub fn client(&self, name: &str) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
1021 let actor_id: ActorAddr = self.allocate_root_id(name)?;
1022 let (instance, _receivers) = Instance::new(self.clone(), actor_id, false, None);
1023 let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
1024 instance.change_status(ActorStatus::Client);
1025 Ok((instance, handle))
1026 }
1027
1028 pub fn introspectable_instance(
1041 &self,
1042 name: &str,
1043 ) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
1044 let actor_id: ActorAddr = self.allocate_root_id(name)?;
1045 let (instance, receivers) = Instance::new(self.clone(), actor_id, false, None);
1046 let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
1047 instance.change_status(ActorStatus::Client);
1048 tokio::spawn(crate::introspect::serve_introspect(
1049 instance.inner.cell.clone(),
1050 receivers.introspect,
1051 ));
1052 Ok((instance, handle))
1053 }
1054
1055 pub fn actor_instance<A: Actor>(&self, name: &str) -> Result<ActorInstance<A>, anyhow::Error> {
1061 let actor_id: ActorAddr = self.allocate_root_id(name)?;
1062 let span = tracing::debug_span!(
1063 "actor_instance",
1064 subject = %actor_id.subject(),
1065 );
1066 let _guard = span.enter();
1067 let (instance, receivers) = Instance::new(self.clone(), actor_id.clone(), false, None);
1068 let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
1069 instance.change_status(ActorStatus::Client);
1070
1071 tokio::spawn(crate::introspect::serve_introspect(
1072 instance.inner.cell.clone(),
1073 receivers.introspect,
1074 ));
1075
1076 let (signal_rx, supervision_rx) = receivers.actor_loop.unwrap();
1077 Ok(ActorInstance {
1078 instance,
1079 handle,
1080 supervision: supervision_rx,
1081 signal: signal_rx,
1082 work: receivers.work,
1083 })
1084 }
1085
1086 pub fn traverse<F>(&self, f: &mut F)
1095 where
1096 F: FnMut(&InstanceCell, usize),
1097 {
1098 for entry in self.state().instances.iter() {
1099 if entry.key().uid().is_singleton()
1100 && let Some(cell) = entry.value().upgrade()
1101 {
1102 cell.traverse(f);
1103 }
1104 }
1105 }
1106
1107 pub fn queue_depth_total(&self) -> u64 {
1109 self.state().queue_stats.running_total()
1110 }
1111
1112 pub fn queue_depth_high_water_mark(&self) -> u64 {
1114 self.state().queue_stats.high_water_mark()
1115 }
1116
1117 pub fn last_nonzero_queue_depth_age_ms(&self) -> Option<u64> {
1119 self.state().queue_stats.last_nonzero_age_ms()
1120 }
1121
1122 pub fn get_instance(&self, actor_id: &ActorAddr) -> Option<InstanceCell> {
1124 self.get_instance_by_id(actor_id.id())
1125 }
1126
1127 pub fn get_instance_by_id(&self, actor_id: &ActorId) -> Option<InstanceCell> {
1129 self.state()
1130 .instances
1131 .get(actor_id)
1132 .and_then(|cell| cell.upgrade())
1133 }
1134
1135 pub fn root_actor_ids(&self) -> Vec<ActorAddr> {
1144 self.state()
1145 .instances
1146 .iter()
1147 .filter_map(|entry| {
1148 entry
1149 .key()
1150 .uid()
1151 .is_singleton()
1152 .then(|| entry.value().upgrade())
1153 .flatten()
1154 .map(|cell| cell.actor_addr().clone())
1155 })
1156 .collect()
1157 }
1158
1159 pub fn all_actor_ids(&self) -> Vec<ActorAddr> {
1168 self.state()
1169 .instances
1170 .iter()
1171 .filter_map(|entry| {
1172 let cell = entry.value().upgrade()?;
1173 (!cell.status().borrow().is_terminal()).then(|| cell.actor_addr().clone())
1174 })
1175 .collect()
1176 }
1177
1178 pub fn all_instance_keys(&self) -> Vec<ActorId> {
1189 self.state()
1190 .instances
1191 .iter()
1192 .map(|entry| entry.key().clone())
1193 .collect()
1194 }
1195
1196 pub fn terminated_snapshot(
1198 &self,
1199 actor_id: &ActorAddr,
1200 ) -> Option<crate::introspect::IntrospectResult> {
1201 self.state()
1202 .terminated_snapshots
1203 .get(actor_id.id())
1204 .map(|entry| entry.value().payload.clone())
1205 }
1206
1207 pub fn all_terminated_actor_ids(&self) -> Vec<ActorAddr> {
1209 self.state()
1210 .terminated_snapshots
1211 .iter()
1212 .map(|entry| entry.value().actor_addr.clone())
1213 .collect()
1214 }
1215
1216 fn child_instance(
1218 &self,
1219 parent: InstanceCell,
1220 ) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
1221 let actor_id = self.allocate_child_id(parent.actor_addr())?;
1222 let _ = tracing::debug_span!(
1223 "child_actor_instance",
1224 subject = %actor_id.subject(),
1225 );
1226
1227 let (instance, _receivers) = Instance::new(self.clone(), actor_id, false, Some(parent));
1228 let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
1231 instance.change_status(ActorStatus::Client);
1232 Ok((instance, handle))
1233 }
1234
1235 pub(crate) fn spawn_child<A: Actor>(
1241 &self,
1242 parent: InstanceCell,
1243 actor: A,
1244 ) -> Result<ActorHandle<A>, anyhow::Error> {
1245 let actor_id = self.allocate_child_id(parent.actor_addr())?;
1246 self.spawn_inner(actor_id, actor, Some(parent))
1247 }
1248
1249 pub(crate) fn spawn_child_with_uid<A: Actor>(
1251 &self,
1252 parent: InstanceCell,
1253 uid: crate::id::Uid,
1254 actor: A,
1255 ) -> Result<ActorHandle<A>, anyhow::Error> {
1256 let actor_id = self.ensure_child_uid(parent.actor_addr(), uid)?;
1257 self.spawn_inner(actor_id, actor, Some(parent))
1258 }
1259
1260 pub(crate) fn spawn_named_child<A: Actor>(
1264 &self,
1265 parent: InstanceCell,
1266 name: &str,
1267 actor: A,
1268 ) -> Result<ActorHandle<A>, anyhow::Error> {
1269 let actor_id = self.allocate_named_child_id(parent.actor_addr(), name)?;
1270 self.spawn_inner(actor_id, actor, Some(parent))
1271 }
1272
1273 pub fn abort_root_actor(&self, root: &ActorId) -> Option<impl Future<Output = ActorAddr>> {
1277 self.state()
1278 .instances
1279 .get(root)
1280 .into_iter()
1281 .flat_map(|entry| entry.value().upgrade())
1282 .map(|cell| {
1283 let actor_addr = cell.actor_addr().clone();
1284 let r1 = actor_addr.clone();
1285 let r2 = actor_addr;
1286 async move {
1290 tokio::task::spawn_blocking(move || {
1291 let h = cell.inner.actor_task_handle.wait();
1292 tracing::debug!("{}: aborting {:?}", r1, h);
1293 h.abort();
1294 })
1295 .await
1296 .unwrap();
1297 r2
1298 }
1299 })
1300 .next()
1301 }
1302
1303 pub fn stop_actor(
1306 &self,
1307 actor_id: &ActorId,
1308 reason: String,
1309 ) -> Option<watch::Receiver<ActorStatus>> {
1310 let cell = match self.state().instances.get(actor_id) {
1315 None => {
1316 tracing::error!(subject = %self.proc_addr().subject(), "no actor {} found", actor_id);
1317 return None;
1318 }
1319 Some(entry) => entry.value().upgrade(),
1320 }; match cell {
1322 None => None, Some(cell) => {
1324 tracing::info!("sending stop signal to {}", cell.actor_addr());
1325 if let Err(err) = cell.signal(Signal::DrainAndStop(reason)) {
1326 tracing::error!(
1327 "failed to send stop signal to uid {}: {:?}",
1328 cell.uid(),
1329 err
1330 );
1331 None
1332 } else {
1333 Some(cell.status().clone())
1334 }
1335 }
1336 }
1337 }
1338
1339 #[hyperactor::instrument(fields(subject = self.proc_addr().subject().to_string()))]
1343 pub async fn destroy_and_wait(
1344 &mut self,
1345 timeout: Duration,
1346 reason: &str,
1347 ) -> Result<(Vec<ActorAddr>, Vec<ActorAddr>), anyhow::Error> {
1348 tracing::debug!("proc stopping");
1349
1350 let coordinator_id = self.supervision_coordinator_actor_addr().cloned();
1351
1352 let mut statuses = HashMap::new();
1355 for actor_id in self
1356 .state()
1357 .instances
1358 .iter()
1359 .filter(|entry| entry.key().uid().is_singleton())
1360 .filter_map(|entry| entry.value().upgrade())
1361 .filter(|cell| !matches!(*cell.status().borrow(), ActorStatus::Client))
1362 .map(|cell| cell.actor_addr().clone())
1363 .collect::<Vec<_>>()
1364 {
1365 if coordinator_id.as_ref() == Some(&actor_id) {
1366 continue;
1367 }
1368 if let Some(status) = self.stop_actor(actor_id.id(), reason.to_string()) {
1369 statuses.insert(actor_id, status);
1370 }
1371 }
1372 tracing::debug!("non-coordinator actors stopped");
1373
1374 let waits: Vec<_> = statuses
1375 .iter_mut()
1376 .map(|(actor_id, root)| {
1377 let actor_id = actor_id.clone();
1378 async move {
1379 tokio::time::timeout(
1380 timeout,
1381 root.wait_for(|state: &ActorStatus| state.is_terminal()),
1382 )
1383 .await
1384 .ok()
1385 .map(|_| actor_id)
1386 }
1387 })
1388 .collect();
1389
1390 let results = futures::future::join_all(waits).await;
1391 let mut stopped_actors: Vec<_> = results
1392 .iter()
1393 .filter_map(|actor_id| actor_id.as_ref())
1394 .cloned()
1395 .collect();
1396 let aborted_actors: Vec<_> = statuses
1397 .iter()
1398 .filter(|(actor_id, _)| !stopped_actors.contains(actor_id))
1399 .map(|(actor_id, _)| {
1400 let f = self.abort_root_actor(actor_id.id());
1401 async move {
1402 let _ = if let Some(f) = f { Some(f.await) } else { None };
1403 actor_id.clone()
1408 }
1409 })
1410 .collect();
1411 let mut aborted_actors = futures::future::join_all(aborted_actors).await;
1412
1413 if let Some(ref coord_id) = coordinator_id
1419 && let Some(mut status) = self.stop_actor(coord_id.id(), reason.to_string())
1420 {
1421 let stopped =
1422 tokio::time::timeout(timeout, status.wait_for(|s: &ActorStatus| s.is_terminal()))
1423 .await
1424 .is_ok();
1425 if stopped {
1426 stopped_actors.push(coord_id.clone());
1427 } else {
1428 if let Some(f) = self.abort_root_actor(coord_id.id()) {
1429 f.await;
1430 }
1431 aborted_actors.push(coord_id.clone());
1432 }
1433 }
1434
1435 let flush_timeout = hyperactor_config::global::get(crate::config::FORWARDER_FLUSH_TIMEOUT);
1442 let gateway = self.gateway();
1443 match tokio::time::timeout(flush_timeout, gateway.flush()).await {
1444 Ok(Err(err)) => {
1445 tracing::warn!("gateway flush failed during proc exit: {:?}", err);
1446 }
1447 Err(_elapsed) => {
1448 tracing::warn!("gateway flush timed out during proc exit");
1449 }
1450 Ok(Ok(())) => {}
1451 }
1452
1453 tracing::info!(
1454 "destroy_and_wait: {} actors stopped, {} actors aborted",
1455 stopped_actors.len(),
1456 aborted_actors.len()
1457 );
1458 Ok((stopped_actors, aborted_actors))
1459 }
1460
1461 pub fn resolve_actor_ref<R: Actor + Referable>(
1483 &self,
1484 actor_ref: &ActorRef<R>,
1485 ) -> Option<ActorHandle<R>> {
1486 let cell = self
1487 .inner
1488 .instances
1489 .get(actor_ref.actor_addr().id())?
1490 .upgrade()?;
1491 if cell.status().borrow().is_terminal() {
1495 return None;
1496 }
1497 cell.downcast_handle()
1498 }
1499
1500 fn allocate_root_id(&self, name: &str) -> Result<ActorAddr, anyhow::Error> {
1504 self.reserve_root(Uid::singleton(Label::strip(name)))
1505 }
1506
1507 fn allocate_root_uid(&self, uid: Uid) -> Result<ActorAddr, anyhow::Error> {
1509 self.reserve_root(uid)
1510 }
1511
1512 fn reserve_root(&self, uid: Uid) -> Result<ActorAddr, anyhow::Error> {
1513 let actor_id = ActorId::new(uid.clone(), self.proc_id().clone(), None);
1514 if !self.state().reserved_roots.insert(uid) {
1515 anyhow::bail!("an actor with id '{}' has already been spawned", actor_id)
1516 }
1517 Ok(ActorAddr::new(actor_id, self.default_location()))
1518 }
1519
1520 #[hyperactor::instrument]
1522 pub(crate) fn allocate_child_id(
1523 &self,
1524 parent_id: &ActorAddr,
1525 ) -> Result<ActorAddr, anyhow::Error> {
1526 assert_eq!(parent_id.proc_id(), self.proc_id());
1527 Ok(parent_id.anonymous_child())
1528 }
1529
1530 fn ensure_child_uid(
1532 &self,
1533 parent_id: &ActorAddr,
1534 uid: crate::id::Uid,
1535 ) -> Result<ActorAddr, anyhow::Error> {
1536 assert_eq!(parent_id.proc_id(), self.proc_id());
1537 let actor_id = ActorId::new(uid.clone(), self.proc_id().clone(), None);
1538 let actor_addr = ActorAddr::new(actor_id, self.default_location());
1539 if !self.state().reserved_child_uids.insert(uid) {
1540 anyhow::bail!("an actor with id {} has already been spawned", actor_addr);
1541 }
1542 Ok(actor_addr)
1543 }
1544
1545 pub(crate) fn allocate_named_child_id(
1547 &self,
1548 parent_id: &ActorAddr,
1549 name: &str,
1550 ) -> Result<ActorAddr, anyhow::Error> {
1551 assert_eq!(parent_id.proc_id(), self.proc_id());
1552 let proc_id = self.proc_id().clone();
1553 let actor_id = crate::id::ActorId::instance(crate::id::Label::strip(name), proc_id);
1554 Ok(ActorAddr::new(actor_id, self.default_location()))
1555 }
1556
1557 pub fn downgrade(&self) -> WeakProc {
1559 WeakProc::new(self)
1560 }
1561
1562 pub async fn flush(&self) -> Result<(), anyhow::Error> {
1565 self.gateway().flush().await
1566 }
1567
1568 pub async fn join_mailbox_server(&self) {
1577 let handle = self.inner.mailbox_server_handle.lock().unwrap().take();
1578 if let Some(handle) = handle {
1579 handle.stop("proc shutting down");
1580 let _ = handle.await;
1581 }
1582 }
1583
1584 pub(crate) fn is_local_delivery_target(&self, dest_proc: &ProcAddr) -> bool {
1585 let local_proc_id = self.proc_id();
1586 if requires_location_for_local_delivery_identity(dest_proc.id()) {
1587 return dest_proc.id() == local_proc_id
1590 && dest_proc.location() == &self.default_location();
1591 }
1592
1593 dest_proc.id() == local_proc_id
1594 }
1595}
1596
1597fn requires_location_for_local_delivery_identity(proc_id: &ProcId) -> bool {
1598 is_legacy_pseudo_singleton_proc_id(proc_id)
1604}
1605
1606fn assert_not_legacy_pseudo_singleton_proc_id(proc_id: &ProcId) {
1607 if is_legacy_pseudo_singleton_proc_id(proc_id) {
1608 panic!(
1609 "legacy pseudo-singleton proc id '{}' must be constructed with a dedicated Proc constructor",
1610 proc_id
1611 );
1612 }
1613}
1614
1615fn is_legacy_pseudo_singleton_proc_id(proc_id: &ProcId) -> bool {
1616 matches!(
1617 proc_id.uid(),
1618 Uid::Singleton(label) if is_legacy_pseudo_singleton_label(label)
1619 )
1620}
1621
1622fn is_legacy_pseudo_singleton_label(label: &Label) -> bool {
1623 matches!(
1624 label.as_str(),
1625 LEGACY_SERVICE_PROC_NAME | LEGACY_LOCAL_PROC_NAME
1626 )
1627}
1628
1629#[async_trait]
1630impl MailboxSender for Proc {
1631 fn post_unchecked(
1632 &self,
1633 envelope: MessageEnvelope,
1634 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1635 ) {
1636 let dest_proc = envelope.dest().actor_addr().proc_addr();
1637 if self.is_local_delivery_target(&dest_proc) {
1638 self.state().proc_muxer.post(envelope, return_handle)
1639 } else {
1640 self.state().gateway.post(envelope, return_handle)
1641 }
1642 }
1643
1644 async fn flush(&self) -> Result<(), anyhow::Error> {
1645 self.gateway().flush().await
1646 }
1647}
1648
1649#[derive(Clone, Debug)]
1651pub struct WeakProc(Weak<ProcState>);
1652
1653impl WeakProc {
1654 fn new(proc: &Proc) -> Self {
1655 Self(Arc::downgrade(&proc.inner))
1656 }
1657
1658 pub fn upgrade(&self) -> Option<Proc> {
1660 self.0.upgrade().map(|inner| Proc { inner })
1661 }
1662}
1663
1664#[async_trait]
1665impl MailboxSender for WeakProc {
1666 fn post_unchecked(
1667 &self,
1668 envelope: MessageEnvelope,
1669 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1670 ) {
1671 match self.upgrade() {
1672 Some(proc) => proc.post(envelope, return_handle),
1673 None => envelope.undeliverable(
1674 DeliveryError::BrokenLink("fail to upgrade WeakProc".to_string()),
1675 return_handle,
1676 ),
1677 }
1678 }
1679
1680 async fn flush(&self) -> Result<(), anyhow::Error> {
1681 match self.upgrade() {
1682 Some(proc) => proc.flush().await,
1683 None => Ok(()),
1684 }
1685 }
1686}
1687
1688pub struct WorkCell<A: Actor + Send>(
1691 Box<
1692 dyn for<'a> FnOnce(
1693 &'a mut A,
1694 &'a Instance<A>,
1695 )
1696 -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
1697 + Send
1698 + Sync,
1699 >,
1700);
1701
1702impl<A: Actor + Send> WorkCell<A> {
1703 fn new(
1705 f: impl for<'a> FnOnce(
1706 &'a mut A,
1707 &'a Instance<A>,
1708 )
1709 -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
1710 + Send
1711 + Sync
1712 + 'static,
1713 ) -> Self {
1714 Self(Box::new(f))
1715 }
1716
1717 pub fn handle<'a>(
1719 self,
1720 actor: &'a mut A,
1721 instance: &'a Instance<A>,
1722 ) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'a>> {
1723 (self.0)(actor, instance)
1724 }
1725}
1726
1727pub struct Context<'a, A: Actor> {
1729 instance: &'a Instance<A>,
1730 headers: Flattrs,
1731}
1732
1733impl<'a, A: Actor> Context<'a, A> {
1734 pub fn new(instance: &'a Instance<A>, headers: Flattrs) -> Self {
1736 Self { instance, headers }
1737 }
1738
1739 pub fn headers(&self) -> &Flattrs {
1741 &self.headers
1742 }
1743}
1744
1745impl<A: Actor> Deref for Context<'_, A> {
1746 type Target = Instance<A>;
1747
1748 fn deref(&self) -> &Self::Target {
1749 self.instance
1750 }
1751}
1752
1753pub struct Instance<A: Actor> {
1757 inner: Arc<InstanceState<A>>,
1758}
1759
1760impl<A: Actor> fmt::Debug for Instance<A> {
1761 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1762 f.debug_struct("Instance").field("inner", &"..").finish()
1763 }
1764}
1765
1766struct InstanceState<A: Actor> {
1767 proc: Proc,
1769
1770 cell: InstanceCell,
1772
1773 mailbox: Mailbox,
1775
1776 ports: Arc<HandlerPorts<A>>,
1777
1778 delayed_posts: DelayedPosts<A>,
1780
1781 status_tx: watch::Sender<ActorStatus>,
1783
1784 id: Uuid,
1786
1787 sequencer: Sequencer,
1789
1790 instance_locals: ActorLocalStorage,
1792}
1793
1794type DelayedPost<A> = Box<dyn FnOnce(&Instance<A>) + Send>;
1795
1796trait PostAfterEndpoint<A: Actor, M: Message>: Send {
1797 fn endpoint_location(&self) -> crate::EndpointLocation;
1798
1799 fn into_delayed_post(self, message: M) -> DelayedPost<A>;
1800}
1801
1802impl<A, M> PostAfterEndpoint<A, M> for &Instance<A>
1803where
1804 A: Actor + Handler<M>,
1805 M: Message,
1806{
1807 fn endpoint_location(&self) -> crate::EndpointLocation {
1808 crate::EndpointLocation::Actor(self.self_addr().clone())
1809 }
1810
1811 fn into_delayed_post(self, message: M) -> DelayedPost<A> {
1812 let dest = self.clone_for_py();
1813 Box::new(move |this| crate::Endpoint::post(&dest, this, message))
1814 }
1815}
1816
1817impl<A, M> PostAfterEndpoint<A, M> for &Context<'_, A>
1818where
1819 A: Actor + Handler<M>,
1820 M: Message,
1821{
1822 fn endpoint_location(&self) -> crate::EndpointLocation {
1823 crate::EndpointLocation::Actor(self.self_addr().clone())
1824 }
1825
1826 fn into_delayed_post(self, message: M) -> DelayedPost<A> {
1827 let dest = self.clone_for_py();
1828 Box::new(move |this| crate::Endpoint::post(&dest, this, message))
1829 }
1830}
1831
1832impl<A, M> PostAfterEndpoint<A, M> for Instance<A>
1833where
1834 A: Actor + Handler<M>,
1835 M: Message,
1836{
1837 fn endpoint_location(&self) -> crate::EndpointLocation {
1838 crate::EndpointLocation::Actor(self.self_addr().clone())
1839 }
1840
1841 fn into_delayed_post(self, message: M) -> DelayedPost<A> {
1842 Box::new(move |this| crate::Endpoint::post(&self, this, message))
1843 }
1844}
1845
1846impl<A, B, M> PostAfterEndpoint<A, M> for ActorHandle<B>
1847where
1848 A: Actor,
1849 B: Actor + Handler<M>,
1850 M: Message,
1851{
1852 fn endpoint_location(&self) -> crate::EndpointLocation {
1853 crate::Endpoint::endpoint_location(&self)
1854 }
1855
1856 fn into_delayed_post(self, message: M) -> DelayedPost<A> {
1857 Box::new(move |this| crate::Endpoint::post(&self, this, message))
1858 }
1859}
1860
1861impl<A, M> PostAfterEndpoint<A, M> for PortHandle<M>
1862where
1863 A: Actor,
1864 M: Message,
1865{
1866 fn endpoint_location(&self) -> crate::EndpointLocation {
1867 crate::Endpoint::endpoint_location(&self)
1868 }
1869
1870 fn into_delayed_post(self, message: M) -> DelayedPost<A> {
1871 Box::new(move |this| crate::Endpoint::post(&self, this, message))
1872 }
1873}
1874
1875impl<A, M> PostAfterEndpoint<A, M> for OncePortHandle<M>
1876where
1877 A: Actor,
1878 M: Message,
1879{
1880 fn endpoint_location(&self) -> crate::EndpointLocation {
1881 crate::Endpoint::endpoint_location(self)
1882 }
1883
1884 fn into_delayed_post(self, message: M) -> DelayedPost<A> {
1885 Box::new(move |this| crate::Endpoint::post(self, this, message))
1886 }
1887}
1888
1889impl<A, B, M> PostAfterEndpoint<A, M> for ActorRef<B>
1890where
1891 A: Actor,
1892 B: Referable + RemoteHandles<M>,
1893 M: RemoteMessage,
1894{
1895 fn endpoint_location(&self) -> crate::EndpointLocation {
1896 crate::Endpoint::endpoint_location(&self)
1897 }
1898
1899 fn into_delayed_post(self, message: M) -> DelayedPost<A> {
1900 Box::new(move |this| crate::Endpoint::post(&self, this, message))
1901 }
1902}
1903
1904impl<A, M> PostAfterEndpoint<A, M> for crate::PortRef<M>
1905where
1906 A: Actor,
1907 M: RemoteMessage,
1908{
1909 fn endpoint_location(&self) -> crate::EndpointLocation {
1910 crate::Endpoint::endpoint_location(&self)
1911 }
1912
1913 fn into_delayed_post(self, message: M) -> DelayedPost<A> {
1914 Box::new(move |this| crate::Endpoint::post(&self, this, message))
1915 }
1916}
1917
1918impl<A, M> PostAfterEndpoint<A, M> for crate::OncePortRef<M>
1919where
1920 A: Actor,
1921 M: RemoteMessage,
1922{
1923 fn endpoint_location(&self) -> crate::EndpointLocation {
1924 crate::Endpoint::endpoint_location(self)
1925 }
1926
1927 fn into_delayed_post(self, message: M) -> DelayedPost<A> {
1928 Box::new(move |this| crate::Endpoint::post(self, this, message))
1929 }
1930}
1931
1932struct DelayedPosts<A: Actor> {
1933 ingress: Arc<DelayedPostIngressGate>,
1934 state: Mutex<DelayedPostState<A>>,
1935 notify: Notify,
1936}
1937
1938struct DelayedPostState<A: Actor> {
1939 queue: BTreeMap<(tokio::time::Instant, u64), DelayedPost<A>>,
1940 next_order: u64,
1941}
1942
1943impl<A: Actor> DelayedPosts<A> {
1944 fn new() -> Self {
1945 Self {
1946 ingress: Arc::new(DelayedPostIngressGate::new()),
1947 state: Mutex::new(DelayedPostState {
1948 queue: BTreeMap::new(),
1949 next_order: 0,
1950 }),
1951 notify: Notify::new(),
1952 }
1953 }
1954
1955 fn push(&self, deadline: tokio::time::Instant, post: DelayedPost<A>) {
1956 let mut state = self.state.lock().unwrap();
1957 let order = state.next_order;
1958 state.next_order = state.next_order.wrapping_add(1);
1959 state.queue.insert((deadline, order), post);
1960 drop(state);
1961 self.notify.notify_one();
1962 }
1963
1964 fn next_deadline(&self) -> Option<tokio::time::Instant> {
1965 self.state
1966 .lock()
1967 .unwrap()
1968 .queue
1969 .keys()
1970 .next()
1971 .map(|(deadline, _)| *deadline)
1972 }
1973
1974 fn pop_due(&self, now: tokio::time::Instant) -> Vec<DelayedPost<A>> {
1975 let mut posts = Vec::new();
1976 let mut state = self.state.lock().unwrap();
1977 while let Some((&(deadline, _), _)) = state.queue.first_key_value() {
1978 if deadline > now {
1979 break;
1980 }
1981 let (_, post) = state.queue.pop_first().expect("delayed post should exist");
1982 posts.push(post);
1983 }
1984 posts
1985 }
1986
1987 fn drain(&self) {
1988 self.ingress.drain();
1989 }
1990
1991 fn is_draining(&self) -> bool {
1992 self.ingress.is_draining()
1993 }
1994}
1995
1996const DELAYED_POST_INGRESS_DRAINING: usize = 1usize << (usize::BITS as usize - 1);
1997const DELAYED_POST_INGRESS_ACTIVE_MASK: usize = !DELAYED_POST_INGRESS_DRAINING;
1998
1999struct DelayedPostIngressGate {
2000 state: AtomicUsize,
2001 wait_lock: Mutex<()>,
2002 drained: Condvar,
2003}
2004
2005struct DelayedPostIngressGuard {
2006 gate: Arc<DelayedPostIngressGate>,
2007}
2008
2009impl DelayedPostIngressGate {
2010 fn new() -> Self {
2011 Self {
2012 state: AtomicUsize::new(0),
2013 wait_lock: Mutex::new(()),
2014 drained: Condvar::new(),
2015 }
2016 }
2017
2018 fn try_enter(self: &Arc<Self>) -> Result<DelayedPostIngressGuard, ()> {
2019 let mut state = self.state.load(Ordering::Acquire);
2020 loop {
2021 if state & DELAYED_POST_INGRESS_DRAINING != 0 {
2022 return Err(());
2023 }
2024
2025 let active = state & DELAYED_POST_INGRESS_ACTIVE_MASK;
2026 assert!(
2027 active < DELAYED_POST_INGRESS_ACTIVE_MASK,
2028 "too many active delayed post sends"
2029 );
2030
2031 match self.state.compare_exchange_weak(
2032 state,
2033 state + 1,
2034 Ordering::AcqRel,
2035 Ordering::Acquire,
2036 ) {
2037 Ok(_) => {
2038 return Ok(DelayedPostIngressGuard {
2039 gate: Arc::clone(self),
2040 });
2041 }
2042 Err(next_state) => state = next_state,
2043 }
2044 }
2045 }
2046
2047 fn drain(&self) {
2048 let mut state = self.state.load(Ordering::Acquire);
2049 loop {
2050 if state & DELAYED_POST_INGRESS_DRAINING != 0 {
2051 break;
2052 }
2053 match self.state.compare_exchange_weak(
2054 state,
2055 state | DELAYED_POST_INGRESS_DRAINING,
2056 Ordering::AcqRel,
2057 Ordering::Acquire,
2058 ) {
2059 Ok(_) => break,
2060 Err(next_state) => state = next_state,
2061 }
2062 }
2063
2064 let mut wait_guard = self.wait_lock.lock().unwrap();
2065 while self.state.load(Ordering::Acquire) & DELAYED_POST_INGRESS_ACTIVE_MASK != 0 {
2066 wait_guard = self.drained.wait(wait_guard).unwrap();
2067 }
2068 }
2069
2070 fn is_draining(&self) -> bool {
2071 self.state.load(Ordering::Acquire) & DELAYED_POST_INGRESS_DRAINING != 0
2072 }
2073}
2074
2075impl Drop for DelayedPostIngressGuard {
2076 fn drop(&mut self) {
2077 let previous = self.gate.state.fetch_sub(1, Ordering::AcqRel);
2078 assert!(
2079 previous & DELAYED_POST_INGRESS_ACTIVE_MASK != 0,
2080 "delayed post ingress active count underflow"
2081 );
2082 if previous & DELAYED_POST_INGRESS_DRAINING != 0
2083 && previous & DELAYED_POST_INGRESS_ACTIVE_MASK == 1
2084 {
2085 let _wait_guard = self.gate.wait_lock.lock().unwrap();
2086 self.gate.drained.notify_all();
2087 }
2088 }
2089}
2090
2091impl<A: Actor> InstanceState<A> {
2092 fn self_addr(&self) -> &ActorAddr {
2093 self.mailbox.actor_addr()
2094 }
2095}
2096
2097impl<A: Actor> Drop for InstanceState<A> {
2098 fn drop(&mut self) {
2099 self.status_tx.send_if_modified(|status| {
2100 if status.is_terminal() {
2101 false
2102 } else {
2103 tracing::info!(
2104 name = "ActorStatus",
2105 actor_id = %self.self_addr(),
2106 actor_name = self.self_addr().log_name(),
2107 status = "Stopped",
2108 prev_status = status.arm().unwrap_or("unknown"),
2109 "instance is dropped",
2110 );
2111 *status = ActorStatus::Stopped("instance is dropped".into());
2112 true
2113 }
2114 });
2115 }
2116}
2117
2118pub struct InstanceReceivers<A: Actor> {
2125 actor_loop: Option<(
2128 PortReceiver<Signal>,
2129 mpsc::UnboundedReceiver<ActorSupervisionEvent>,
2130 )>,
2131 work: mpsc::UnboundedReceiver<WorkCell<A>>,
2133 introspect: PortReceiver<IntrospectMessage>,
2135}
2136
2137impl<A: Actor> Instance<A> {
2138 fn new(
2140 proc: Proc,
2141 actor_id: ActorAddr,
2142 detached: bool,
2143 parent: Option<InstanceCell>,
2144 ) -> (Self, InstanceReceivers<A>) {
2145 let mailbox = Mailbox::new(actor_id.clone());
2147 let (work_tx, work_rx) = ordered_channel(
2148 actor_id.to_string(),
2149 hyperactor_config::global::get(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER),
2150 );
2151 let queue_depth = Arc::new(AtomicU64::new(0));
2152 let proc_stats = Arc::clone(&proc.state().queue_stats);
2153 let ports: Arc<HandlerPorts<A>> = Arc::new(HandlerPorts::new(
2154 mailbox.clone(),
2155 work_tx,
2156 Arc::clone(&queue_depth),
2157 proc_stats,
2158 ));
2159 proc.state().proc_muxer.bind_mailbox(mailbox.clone());
2160 let (status_tx, status_rx) = watch::channel(ActorStatus::Created);
2161
2162 let actor_type = match TypeInfo::of::<A>() {
2163 Some(info) => ActorType::Named(info),
2164 None => ActorType::Anonymous(std::any::type_name::<A>()),
2165 };
2166 let actor_loop_ports = if detached {
2167 None
2168 } else {
2169 let (signal_port, signal_receiver) = mailbox.open_port::<Signal>();
2170 let (supervision_tx, supervision_receiver) =
2171 mpsc::unbounded_channel::<ActorSupervisionEvent>();
2172 Some((
2173 (signal_port, supervision_tx),
2174 (signal_receiver, supervision_receiver),
2175 ))
2176 };
2177
2178 let (actor_loop, actor_loop_receivers) = actor_loop_ports.unzip();
2179
2180 let (introspect_port, introspect_receiver) = mailbox.open_port::<IntrospectMessage>();
2187 introspect_port.bind_handler_port();
2188
2189 let cell = InstanceCell::new(
2190 actor_id,
2191 actor_type,
2192 proc.clone(),
2193 actor_loop,
2194 status_rx,
2195 parent,
2196 ports.clone(),
2197 queue_depth,
2198 );
2199 let instance_id = Uuid::now_v7();
2200 let inner = Arc::new(InstanceState {
2201 proc,
2202 cell,
2203 mailbox,
2204 ports,
2205 delayed_posts: DelayedPosts::new(),
2206 status_tx,
2207 sequencer: Sequencer::new(instance_id),
2208 id: instance_id,
2209 instance_locals: ActorLocalStorage::new(),
2210 });
2211 (
2212 Self { inner },
2213 InstanceReceivers {
2214 actor_loop: actor_loop_receivers,
2215 work: work_rx,
2216 introspect: introspect_receiver,
2217 },
2218 )
2219 }
2220
2221 #[track_caller]
2224 pub fn change_status(&self, new: ActorStatus) {
2225 let old = self.inner.status_tx.send_replace(new.clone());
2226 assert!(
2232 !old.is_terminal() && !new.is_terminal()
2233 || !old.is_terminal() && new.is_terminal()
2234 || old == new,
2235 "actor changing status illegally, only allow non-terminal -> non-terminal \
2236 and non-terminal -> terminal statuses. actor_id={}, prev_status={}, status={}",
2237 self.self_addr(),
2238 old,
2239 new
2240 );
2241 if !((old.is_idle() && new.is_processing())
2246 || (old.is_processing() && new.is_idle())
2247 || old == new)
2248 {
2249 let new_status = new.arm().unwrap_or("unknown");
2250 let change_reason = match &new {
2251 ActorStatus::Failed(reason) => reason.to_string(),
2252 ActorStatus::Stopped(reason) => reason.clone(),
2253 _ => "".to_string(),
2254 };
2255 tracing::info!(
2256 name = "ActorStatus",
2257 actor_id = %self.self_addr(),
2258 actor_name = self.self_addr().log_name(),
2259 status = new_status,
2260 prev_status = old.arm().unwrap_or("unknown"),
2261 caller = %PanicLocation::caller(),
2262 change_reason,
2263 );
2264 let actor_id = hash_to_u64(self.self_addr());
2265 notify_actor_status_changed(ActorStatusEvent {
2266 id: generate_actor_status_event_id(actor_id),
2267 timestamp: std::time::SystemTime::now(),
2268 actor_id,
2269 new_status: new_status.to_string(),
2270 reason: if change_reason.is_empty() {
2271 None
2272 } else {
2273 Some(change_reason)
2274 },
2275 });
2276 }
2277 }
2278
2279 fn is_terminal(&self) -> bool {
2280 self.inner.status_tx.borrow().is_terminal()
2281 }
2282
2283 fn is_stopping(&self) -> bool {
2284 self.inner.status_tx.borrow().is_stopping()
2285 }
2286
2287 pub fn self_addr(&self) -> &ActorAddr {
2289 self.inner.self_addr()
2290 }
2291
2292 pub(crate) fn report_lost_message(&self, lost: crate::mailbox::LostMessage) {
2294 static REPORT_LOST_WARNED_MAILBOXES: OnceLock<DashSet<ActorAddr>> = OnceLock::new();
2295
2296 let mailbox = &self.inner.mailbox;
2297 let return_handle = mailbox.bound_return_handle().unwrap_or_else(|| {
2298 let actor_id = mailbox.actor_addr();
2299 if REPORT_LOST_WARNED_MAILBOXES
2300 .get_or_init(DashSet::new)
2301 .insert(actor_id.clone())
2302 {
2303 let bt = std::backtrace::Backtrace::force_capture();
2304 tracing::warn!(
2305 actor_id = ?actor_id,
2306 backtrace = ?bt,
2307 "actor attempted to report a lost message without binding Undeliverable<MessageEnvelope>"
2308 );
2309 }
2310 crate::mailbox::monitored_return_handle()
2311 });
2312
2313 if let Err(error) =
2314 return_handle.try_send(self, crate::mailbox::Undeliverable::lost(lost.clone()))
2315 {
2316 tracing::error!(
2317 sender = %lost.sender,
2318 dest = %lost.dest,
2319 message_type = lost.message_type.as_deref().unwrap_or("unknown"),
2320 error = %lost.error,
2321 return_error = %error,
2322 "lost message could not be reported"
2323 );
2324 }
2325 }
2326
2327 pub fn introspect_payload(&self) -> crate::introspect::IntrospectResult {
2343 crate::introspect::live_actor_payload(&self.inner.cell)
2344 }
2345
2346 pub fn recording_span(&self) -> tracing::Span {
2350 use crate::subject::AsSubject;
2351 self.inner
2352 .cell
2353 .recording()
2354 .span(&self.self_addr().subject().to_string())
2355 }
2356
2357 pub fn publish_attrs(&self, attrs: hyperactor_config::Attrs) {
2365 #[cfg(debug_assertions)]
2366 {
2367 use std::collections::HashSet;
2368 use std::sync::OnceLock;
2369
2370 use hyperactor_config::attrs::AttrKeyInfo;
2371
2372 static INTROSPECT_KEYS: OnceLock<HashSet<&'static str>> = OnceLock::new();
2373 let allowed = INTROSPECT_KEYS.get_or_init(|| {
2374 inventory::iter::<AttrKeyInfo>()
2375 .filter(|info| info.meta.get(hyperactor_config::INTROSPECT).is_some())
2376 .map(|info| info.name)
2377 .collect()
2378 });
2379 for (name, _) in attrs.iter() {
2380 debug_assert!(
2381 allowed.contains(name),
2382 "publish_attrs: key {:?} is not tagged with INTROSPECT",
2383 name
2384 );
2385 }
2386 }
2387 self.inner.cell.set_published_attrs(attrs);
2388 }
2389
2390 pub fn publish_attr<T: hyperactor_config::AttrValue>(
2396 &self,
2397 key: hyperactor_config::Key<T>,
2398 value: T,
2399 ) {
2400 debug_assert!(
2401 key.attrs().get(hyperactor_config::INTROSPECT).is_some(),
2402 "publish_attr called with non-introspection key: {}",
2403 key.name()
2404 );
2405 self.inner.cell.merge_published_attr(key, value);
2406 }
2407
2408 pub fn set_system(&self) {
2411 self.inner
2412 .cell
2413 .inner
2414 .is_system
2415 .store(true, Ordering::Relaxed);
2416 }
2417
2418 pub fn set_query_child_handler(
2427 &self,
2428 handler: impl (Fn(&Addr) -> IntrospectResult) + Send + Sync + 'static,
2429 ) {
2430 self.inner.cell.set_query_child_handler(handler);
2431 }
2432
2433 pub fn stop(&self, reason: &str) -> Result<(), ActorError> {
2435 tracing::info!(
2436 actor_id = %self.inner.cell.actor_addr(),
2437 reason,
2438 "instance stop called",
2439 );
2440 self.inner.cell.signal(Signal::Stop(reason.to_string()))
2441 }
2442
2443 pub fn drain_and_stop(&self, reason: &str) -> Result<(), ActorError> {
2445 tracing::info!(
2446 actor_id = %self.inner.cell.actor_addr(),
2447 reason,
2448 "instance drain_and_stop called",
2449 );
2450 self.inner
2451 .cell
2452 .signal(Signal::DrainAndStop(reason.to_string()))
2453 }
2454
2455 pub fn kill(&self, reason: &str) -> Result<(), ActorError> {
2457 tracing::info!(
2458 actor_id = %self.inner.cell.actor_addr(),
2459 reason,
2460 "instance kill called",
2461 );
2462 self.inner.cell.signal(Signal::Kill(reason.to_string()))
2463 }
2464
2465 pub fn abort(&self, reason: &str) -> Result<(), ActorError> {
2467 tracing::info!(
2468 actor_id = %self.inner.cell.actor_addr(),
2469 reason,
2470 "instance abort called",
2471 );
2472 self.kill(reason)
2473 }
2474
2475 pub fn close(&self) {
2477 self.inner.delayed_posts.drain();
2478 self.inner.mailbox.drain();
2479 }
2480
2481 pub fn exit(&self, reason: &str) -> Result<(), ActorError> {
2483 self.inner
2484 .cell
2485 .signal(Signal::ExitRequested(reason.to_string()))
2486 }
2487
2488 pub fn exit_after_drain(&self, reason: &str) -> Result<(), ActorError> {
2495 let this = self.clone_for_py();
2496 let reason = reason.to_string();
2497 let work = WorkCell::new(move |_actor: &mut A, _instance: &Instance<A>| {
2498 Box::pin(async move {
2499 this.exit(&reason).map_err(anyhow::Error::from)?;
2500 Ok(())
2501 })
2502 });
2503 self.enqueue_runtime_work(work)
2504 }
2505
2506 pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
2511 self.inner.mailbox.open_port()
2512 }
2513
2514 pub fn open_once_port<M: Message>(&self) -> (OncePortHandle<M>, OncePortReceiver<M>) {
2518 self.inner.mailbox.open_once_port()
2519 }
2520
2521 #[doc(hidden)]
2523 pub fn signal_port(&self) -> PortHandle<Signal> {
2524 self.inner.cell.signal_port()
2525 }
2526
2527 pub fn locals(&self) -> &ActorLocalStorage {
2529 &self.inner.instance_locals
2530 }
2531
2532 pub fn post(&self, port_id: impl Into<PortAddr>, headers: Flattrs, message: wirevalue::Any) {
2534 let port_id: PortAddr = port_id.into();
2535 <Self as context::MailboxExt>::post(
2536 self,
2537 port_id,
2538 headers,
2539 message,
2540 true,
2541 context::SeqInfoPolicy::AssignNew,
2542 )
2543 }
2544
2545 #[doc(hidden)]
2551 pub fn post_with_external_seq_info(
2552 &self,
2553 port_id: impl Into<PortAddr>,
2554 headers: Flattrs,
2555 message: wirevalue::Any,
2556 ) {
2557 <Self as context::MailboxExt>::post(
2558 self,
2559 port_id.into(),
2560 headers,
2561 message,
2562 true,
2563 context::SeqInfoPolicy::AllowExternal,
2564 )
2565 }
2566
2567 fn enqueue_runtime_work(&self, work: WorkCell<A>) -> Result<(), ActorError> {
2568 let actor_id_str = self.self_addr().to_string();
2569 account_enqueue(
2570 &self.inner.cell.inner.queue_depth,
2571 &self.inner.proc.state().queue_stats,
2572 &actor_id_str,
2573 );
2574 let result = self
2575 .inner
2576 .ports
2577 .workq
2578 .direct_send(work)
2579 .map_err(anyhow::Error::from);
2580 if result.is_err() {
2581 account_cancel_enqueue(
2582 &self.inner.cell.inner.queue_depth,
2583 &self.inner.proc.state().queue_stats,
2584 &actor_id_str,
2585 );
2586 }
2587 result.map_err(|err| ActorError::new(self.self_addr(), ActorErrorKind::processing(err)))
2588 }
2589
2590 pub fn self_client() -> &'static Instance<()> {
2596 static CLIENT: OnceLock<(Instance<()>, ActorHandle<()>)> = OnceLock::new();
2597 &CLIENT
2598 .get_or_init(|| Proc::runtime().client("self_message_client").unwrap())
2599 .0
2600 }
2601
2602 #[allow(private_bounds)]
2608 pub fn post_after<D, M>(&self, dest: D, message: M, delay: Duration)
2609 where
2610 M: Message,
2611 D: PostAfterEndpoint<A, M>,
2612 {
2613 let dest_location = dest.endpoint_location();
2614 if matches!(*self.inner.status_tx.borrow(), ActorStatus::Client) {
2615 self.report_lost_message(crate::mailbox::LostMessage {
2616 sender: self.mailbox().actor_addr().clone(),
2617 dest: dest_location,
2618 message_type: Some(std::any::type_name::<M>().to_string()),
2619 error: "delayed posts require an actor runtime".to_string(),
2620 });
2621 return;
2622 }
2623 let Ok(_guard) = self.inner.delayed_posts.ingress.try_enter() else {
2624 self.report_lost_message(crate::mailbox::LostMessage {
2625 sender: self.mailbox().actor_addr().clone(),
2626 dest: dest_location,
2627 message_type: Some(std::any::type_name::<M>().to_string()),
2628 error: "actor runtime is stopping".to_string(),
2629 });
2630 return;
2631 };
2632 if self.is_stopping() || self.is_terminal() {
2633 self.report_lost_message(crate::mailbox::LostMessage {
2634 sender: self.mailbox().actor_addr().clone(),
2635 dest: dest_location,
2636 message_type: Some(std::any::type_name::<M>().to_string()),
2637 error: "actor runtime is stopping".to_string(),
2638 });
2639 return;
2640 }
2641
2642 self.inner.delayed_posts.push(
2643 tokio::time::Instant::now() + delay,
2644 dest.into_delayed_post(message),
2645 );
2646 }
2647
2648 fn start(self, actor: A, receivers: InstanceReceivers<A>) -> ActorHandle<A> {
2651 let instance_cell = self.inner.cell.clone();
2652 let actor_id = self.inner.cell.actor_addr().clone();
2653 let actor_handle = ActorHandle::new(self.inner.cell.clone(), self.inner.ports.clone());
2654
2655 tokio::spawn(crate::introspect::serve_introspect(
2659 self.inner.cell.clone(),
2660 receivers.introspect,
2661 ));
2662
2663 let actor_loop_receivers = receivers
2664 .actor_loop
2665 .expect("non-detached instance must have actor loop receivers");
2666 let actor_task_handle = A::spawn_server_task(
2667 panic_handler::with_backtrace_tracking(self.serve(
2668 actor,
2669 actor_loop_receivers,
2670 receivers.work,
2671 ))
2672 .instrument(Span::current()),
2673 );
2674 tracing::debug!("{}: spawned with {:?}", actor_id, actor_task_handle);
2675 instance_cell
2676 .inner
2677 .actor_task_handle
2678 .set(actor_task_handle)
2679 .unwrap_or_else(|_| panic!("{}: task handle store failed", actor_id));
2680
2681 actor_handle
2682 }
2683
2684 async fn serve(
2685 mut self,
2686 mut actor: A,
2687 actor_loop_receivers: (
2688 PortReceiver<Signal>,
2689 mpsc::UnboundedReceiver<ActorSupervisionEvent>,
2690 ),
2691 mut work_rx: mpsc::UnboundedReceiver<WorkCell<A>>,
2692 ) {
2693 let result = self
2694 .run_actor_tree(&mut actor, actor_loop_receivers, &mut work_rx)
2695 .await;
2696
2697 assert!(self.is_stopping());
2698 let (terminal_status, event) = match result {
2704 Ok(stop_reason) => {
2705 let status = ActorStatus::Stopped(stop_reason);
2706 let event = ActorSupervisionEvent::new(
2707 self.inner.cell.actor_addr().clone(),
2708 actor.display_name(),
2709 status.clone(),
2710 None,
2711 );
2712 (status, Some(event))
2713 }
2714 Err(err) => match *err.kind {
2715 ActorErrorKind::UnhandledSupervisionEvent(box event) => {
2716 assert!(event.actor_status.is_terminal());
2718 let status = event.actor_status.clone();
2719 (status, Some(event))
2720 }
2721 _ => {
2722 let error_kind = ActorErrorKind::Generic(err.kind.to_string());
2723 let status = ActorStatus::Failed(error_kind);
2724 let event = ActorSupervisionEvent::new(
2725 self.inner.cell.actor_addr().clone(),
2726 actor.display_name(),
2727 status.clone(),
2728 None,
2729 );
2730 (status, Some(event))
2731 }
2732 },
2733 };
2734
2735 self.mailbox().close(terminal_status.clone());
2736 if let Some(event) = &event {
2738 *self.inner.cell.inner.supervision_event.lock().unwrap() = Some(event.clone());
2739 }
2740
2741 if let Some(parent) = self.inner.cell.maybe_unlink_parent() {
2746 if let Some(event) = event {
2747 parent.send_supervision_event_or_crash(event);
2749 }
2750 if let Err(err) = parent.signal(Signal::ChildStopped(self.inner.cell.uid().clone())) {
2753 tracing::error!(
2754 "{}: failed to send stop message to parent uid {}: {:?}",
2755 self.self_addr(),
2756 parent.uid(),
2757 err
2758 );
2759 }
2760 } else {
2761 if let Some(event) = event {
2767 self.inner
2768 .proc
2769 .handle_unhandled_supervision_event(&self, event);
2770 }
2771 }
2772
2773 self.change_status(terminal_status);
2774 }
2775
2776 async fn run_actor_tree(
2780 &mut self,
2781 actor: &mut A,
2782 mut actor_loop_receivers: (
2783 PortReceiver<Signal>,
2784 mpsc::UnboundedReceiver<ActorSupervisionEvent>,
2785 ),
2786 work_rx: &mut mpsc::UnboundedReceiver<WorkCell<A>>,
2787 ) -> Result<String, ActorError> {
2788 let mut did_panic = false;
2794 let result = match AssertUnwindSafe(self.run(actor, &mut actor_loop_receivers, work_rx))
2795 .catch_unwind()
2796 .await
2797 {
2798 Ok(result) => result,
2799 Err(_) => {
2800 did_panic = true;
2801 let panic_info = panic_handler::take_panic_info()
2802 .map(|info| info.to_string())
2803 .unwrap_or_else(|e| format!("Cannot take backtrace due to: {:?}", e));
2804 Err(ActorError::new(
2805 self.self_addr(),
2806 ActorErrorKind::panic(anyhow::anyhow!(panic_info)),
2807 ))
2808 }
2809 };
2810
2811 assert!(!self.is_terminal());
2812 self.change_status(ActorStatus::Stopping);
2813 if let Err(err) = &result {
2814 tracing::error!("{}: actor failure: {}", self.self_addr(), err);
2815 }
2816
2817 let mut to_unlink = Vec::new();
2820 for child in self.inner.cell.child_iter() {
2821 if let Err(err) = child
2822 .value()
2823 .signal(Signal::Stop("parent stopping".to_string()))
2824 {
2825 tracing::error!(
2826 "{}: failed to send stop signal to child pid {}: {:?}",
2827 self.self_addr(),
2828 child.key(),
2829 err
2830 );
2831 to_unlink.push(child.value().clone());
2832 }
2833 }
2834 for child in to_unlink {
2836 self.inner.cell.unlink(&child);
2837 }
2838
2839 let (mut signal_receiver, _) = actor_loop_receivers;
2840 while self.inner.cell.child_count() > 0 {
2841 match tokio::time::timeout(Duration::from_millis(500), signal_receiver.recv()).await {
2842 Ok(signal) => {
2843 if let Signal::ChildStopped(uid) = signal? {
2844 assert!(self.inner.cell.get_child(&uid).is_none());
2845 }
2846 }
2847 Err(_) => {
2848 tracing::warn!(
2849 "timeout waiting for ChildStopped signal from child on actor: {}, ignoring",
2850 self.self_addr()
2851 );
2852 self.inner.cell.unlink_all();
2855 break;
2856 }
2857 }
2858 }
2859 let cleanup_result = if !did_panic {
2865 let cleanup_timeout = hyperactor_config::global::get(config::CLEANUP_TIMEOUT);
2866 match tokio::time::timeout(cleanup_timeout, actor.cleanup(self, result.as_ref().err()))
2867 .await
2868 {
2869 Ok(Ok(x)) => Ok(x),
2870 Ok(Err(e)) => Err(ActorError::new(
2871 self.self_addr(),
2872 ActorErrorKind::cleanup(e),
2873 )),
2874 Err(e) => Err(ActorError::new(
2875 self.self_addr(),
2876 ActorErrorKind::cleanup(e.into()),
2877 )),
2878 }
2879 } else {
2880 Ok(())
2881 };
2882 if let Err(ref actor_err) = result {
2883 if let Err(ref err) = cleanup_result {
2886 tracing::warn!(
2887 cleanup_err = %err,
2888 %actor_err,
2889 "ignoring cleanup error after actor error",
2890 );
2891 }
2892 }
2893 result.and_then(|reason| cleanup_result.map(|_| reason))
2896 }
2897
2898 async fn run(
2902 &mut self,
2903 actor: &mut A,
2904 actor_loop_receivers: &mut (
2905 PortReceiver<Signal>,
2906 mpsc::UnboundedReceiver<ActorSupervisionEvent>,
2907 ),
2908 work_rx: &mut mpsc::UnboundedReceiver<WorkCell<A>>,
2909 ) -> Result<String, ActorError> {
2910 let (signal_receiver, supervision_event_receiver) = actor_loop_receivers;
2911
2912 self.change_status(ActorStatus::Initializing);
2913 actor
2914 .init(self)
2915 .await
2916 .map_err(|err| ActorError::new(self.self_addr(), ActorErrorKind::init(err)))?;
2917 let actor_id_str = self.self_addr().to_string();
2918 let stop_reason = 'messages: loop {
2919 if !self.is_stopping() {
2920 self.change_status(ActorStatus::Idle);
2921 }
2922 let next_delayed_deadline = self.inner.delayed_posts.next_deadline();
2923 let metric_pairs = hyperactor_telemetry::kv_pairs!("actor_id" => actor_id_str.clone());
2924 tokio::select! {
2925 biased;
2926 signal = signal_receiver.recv() => {
2927 let signal = signal.map_err(ActorError::from);
2928 tracing::debug!("received signal {signal:?}");
2929 match signal? {
2930 Signal::Stop(reason) => {
2931 self.change_status(ActorStatus::Stopping);
2932 actor
2933 .handle_stop(self, StopMode::Stop, &reason)
2934 .await
2935 .map_err(|err| ActorError::new(self.self_addr(), ActorErrorKind::processing(err)))?;
2936 },
2937 Signal::DrainAndStop(reason) => {
2938 self.change_status(ActorStatus::Stopping);
2939 actor
2940 .handle_stop(self, StopMode::DrainAndStop, &reason)
2941 .await
2942 .map_err(|err| ActorError::new(self.self_addr(), ActorErrorKind::processing(err)))?;
2943 },
2944 Signal::ChildStopped(uid) => {
2945 assert!(self.inner.cell.get_child(&uid).is_none());
2946 },
2947 Signal::ExitRequested(reason) => {
2948 break 'messages reason;
2949 }
2950 Signal::Kill(reason) => {
2951 return Err(ActorError { actor_id: Box::new(self.self_addr().clone()), kind: Box::new(ActorErrorKind::Aborted(reason)) });
2952 }
2953 }
2954 }
2955 work = work_rx.recv() => {
2956 ACTOR_MESSAGES_RECEIVED.add(1, metric_pairs);
2957 account_dequeue(&self.inner.cell.inner.queue_depth, &self.inner.proc.state().queue_stats, &actor_id_str);
2958 let _ = ACTOR_MESSAGE_HANDLER_DURATION.start(metric_pairs);
2959 let work = work.expect("inconsistent work queue state");
2960 if let Err(err) = work.handle(actor, self).await {
2961 while let Ok(supervision_event) = supervision_event_receiver.try_recv() {
2962 self.handle_supervision_event(actor, supervision_event).await?;
2963 }
2964 let kind = ActorErrorKind::processing(err);
2965 return Err(ActorError {
2966 actor_id: Box::new(self.self_addr().clone()),
2967 kind: Box::new(kind),
2968 });
2969 }
2970 }
2971 _ = self.inner.delayed_posts.notify.notified(), if !self.is_stopping() && !self.inner.delayed_posts.is_draining() => {
2972 }
2973 _ = async {
2974 match next_delayed_deadline {
2975 Some(deadline) => tokio::time::sleep_until(deadline).await,
2976 None => std::future::pending::<()>().await,
2977 }
2978 }, if !self.is_stopping() && !self.inner.delayed_posts.is_draining() && next_delayed_deadline.is_some() => {
2979 let now = tokio::time::Instant::now();
2980 if let Ok(_guard) = self.inner.delayed_posts.ingress.try_enter() {
2981 for post in self.inner.delayed_posts.pop_due(now) {
2982 post(self);
2983 }
2984 }
2985 }
2986 Some(supervision_event) = supervision_event_receiver.recv() => {
2987 self.handle_supervision_event(actor, supervision_event).await?;
2988 }
2989 }
2990 self.inner
2991 .cell
2992 .inner
2993 .num_processed_messages
2994 .fetch_add(1, Ordering::SeqCst);
2995 };
2996 tracing::debug!(
2997 actor_id = %self.self_addr(),
2998 reason = stop_reason,
2999 "exited actor loop",
3000 );
3001 Ok(stop_reason)
3002 }
3003
3004 pub async fn handle_supervision_event(
3006 &self,
3007 actor: &mut A,
3008 supervision_event: ActorSupervisionEvent,
3009 ) -> Result<(), ActorError> {
3010 match actor
3012 .handle_supervision_event(self, &supervision_event)
3013 .await
3014 {
3015 Ok(true) => {
3016 Ok(())
3018 }
3019 Ok(false) => {
3020 let kind = ActorErrorKind::UnhandledSupervisionEvent(Box::new(supervision_event));
3021 Err(ActorError::new(self.self_addr(), kind))
3022 }
3023 Err(err) => {
3024 let kind = ActorErrorKind::ErrorDuringHandlingSupervision(
3027 err.to_string(),
3028 Box::new(supervision_event),
3029 );
3030 Err(ActorError::new(self.self_addr(), kind))
3031 }
3032 }
3033 }
3034
3035 async unsafe fn handle_message<M: Message>(
3036 &self,
3037 actor: &mut A,
3038 type_info: Option<&'static TypeInfo>,
3039 headers: Flattrs,
3040 message: M,
3041 ) -> Result<(), anyhow::Error>
3042 where
3043 A: Handler<M>,
3044 {
3045 let handler_info = match type_info {
3047 Some(info) => {
3048 let arm = unsafe { info.arm_unchecked(&message as *const M as *const ()) };
3050 HandlerInfo::from_static(info.typename(), arm)
3051 }
3052 None => {
3053 HandlerInfo::from_static(std::any::type_name::<M>(), None)
3055 }
3056 };
3057
3058 let endpoint = type_info.and_then(|info| {
3059 unsafe { info.endpoint_name(&message as *const M as *const ()) }
3061 });
3062
3063 self.handle_message_with_handler_info(actor, handler_info, headers, message, endpoint)
3065 .await
3066 }
3067
3068 #[tracing::instrument(level = "debug", name = "handle_message", skip_all, fields(message_type = %handler_info))]
3069 async fn handle_message_with_handler_info<M: Message>(
3070 &self,
3071 actor: &mut A,
3072 handler_info: HandlerInfo,
3073 headers: Flattrs,
3074 message: M,
3075 endpoint: Option<String>,
3076 ) -> Result<(), anyhow::Error>
3077 where
3078 A: Handler<M>,
3079 {
3080 let now = std::time::SystemTime::now();
3081 let handler_info = Some(handler_info);
3082 self.change_status(ActorStatus::Processing(now, handler_info.clone()));
3083 crate::mailbox::headers::log_message_latency_if_sampling(
3084 &headers,
3085 self.self_addr().to_string(),
3086 );
3087
3088 let message_id = headers.get(crate::mailbox::headers::TELEMETRY_MESSAGE_ID);
3089
3090 if let Some(message_id) = message_id {
3091 let from_actor_id = headers
3092 .get(crate::mailbox::headers::SENDER_ACTOR_ID_HASH)
3093 .unwrap_or(0);
3094 let to_actor_id = hash_to_u64(self.self_addr());
3095 let port_id = headers.get(crate::mailbox::headers::TELEMETRY_PORT_ID);
3096
3097 notify_message(hyperactor_telemetry::MessageEvent {
3098 timestamp: now,
3099 id: message_id,
3100 from_actor_id,
3101 to_actor_id,
3102 endpoint,
3103 port_id,
3104 });
3105
3106 notify_message_status(hyperactor_telemetry::MessageStatusEvent {
3107 timestamp: now,
3108 id: hyperactor_telemetry::generate_status_event_id(message_id),
3109 message_id,
3110 status: "active".to_string(),
3111 });
3112 }
3113
3114 *self.inner.cell.inner.last_message_handler.write().unwrap() = handler_info;
3116
3117 let context = Context::new(self, headers);
3118 let start = Instant::now();
3122 let subject_str = self.self_addr().subject().to_string();
3123 let result = actor
3124 .handle(&context, message)
3125 .instrument(self.inner.cell.inner.recording.span(&subject_str))
3126 .await;
3127 let elapsed_us = start.elapsed().as_micros() as u64;
3128 self.inner
3129 .cell
3130 .inner
3131 .total_processing_time_us
3132 .fetch_add(elapsed_us, Ordering::SeqCst);
3133
3134 if let Some(message_id) = message_id {
3135 notify_message_status(hyperactor_telemetry::MessageStatusEvent {
3136 timestamp: std::time::SystemTime::now(),
3137 id: hyperactor_telemetry::generate_status_event_id(message_id),
3138 message_id,
3139 status: "complete".to_string(),
3140 });
3141 }
3142
3143 result
3144 }
3145
3146 pub fn spawn<C: Actor>(&self, actor: C) -> anyhow::Result<ActorHandle<C>> {
3148 self.inner.proc.spawn_child(self.inner.cell.clone(), actor)
3149 }
3150
3151 pub fn spawn_with_name<C: Actor>(
3155 &self,
3156 name: &str,
3157 actor: C,
3158 ) -> anyhow::Result<ActorHandle<C>> {
3159 self.inner
3160 .proc
3161 .spawn_named_child(self.inner.cell.clone(), name, actor)
3162 }
3163
3164 pub fn child(&self) -> anyhow::Result<(Instance<()>, ActorHandle<()>)> {
3166 self.inner.proc.child_instance(self.inner.cell.clone())
3167 }
3168
3169 pub async fn gspawn(&self, actor_type: &str, params: Data) -> anyhow::Result<AnyActorHandle> {
3174 self.gspawn_uid(actor_type, crate::id::Uid::anonymous(), params)
3175 .await
3176 }
3177
3178 pub async fn gspawn_uid(
3183 &self,
3184 actor_type: &str,
3185 uid: crate::id::Uid,
3186 params: Data,
3187 ) -> anyhow::Result<AnyActorHandle> {
3188 crate::actor::remote::Remote::global()
3189 .gspawn_child(
3190 &self.inner.proc,
3191 self.inner.cell.clone(),
3192 actor_type,
3193 uid,
3194 params,
3195 Flattrs::default(),
3196 )
3197 .await
3198 }
3199
3200 pub fn port<M: Message>(&self) -> PortHandle<M>
3203 where
3204 A: Handler<M>,
3205 {
3206 self.inner.ports.get()
3207 }
3208
3209 pub fn handle(&self) -> ActorHandle<A> {
3211 ActorHandle::new(self.inner.cell.clone(), Arc::clone(&self.inner.ports))
3212 }
3213
3214 pub fn bind<R: Binds<A>>(&self) -> ActorRef<R> {
3216 self.inner.cell.bind(self.inner.ports.as_ref())
3217 }
3218
3219 #[doc(hidden)]
3221 pub fn mailbox_for_py(&self) -> &Mailbox {
3222 &self.inner.mailbox
3223 }
3224
3225 pub fn proc(&self) -> &Proc {
3227 &self.inner.proc
3228 }
3229
3230 #[doc(hidden)]
3234 pub fn clone_for_py(&self) -> Self {
3235 Self {
3236 inner: Arc::clone(&self.inner),
3237 }
3238 }
3239
3240 fn actor_task_handle(&self) -> Option<&JoinHandle<()>> {
3242 self.inner.cell.inner.actor_task_handle.get()
3243 }
3244
3245 pub fn sequencer(&self) -> &Sequencer {
3247 &self.inner.sequencer
3248 }
3249
3250 pub fn instance_id(&self) -> Uuid {
3252 self.inner.id
3253 }
3254
3255 pub fn parent_handle<P: Actor>(&self) -> Option<ActorHandle<P>> {
3257 let parent_cell = self.inner.cell.inner.parent.upgrade()?;
3258 let ports = if let Ok(ports) = parent_cell.inner.ports.clone().downcast() {
3259 ports
3260 } else {
3261 return None;
3262 };
3263 Some(ActorHandle::new(parent_cell, ports))
3264 }
3265}
3266
3267impl<A: Actor> context::Mailbox for Instance<A> {
3268 fn mailbox(&self) -> &Mailbox {
3269 &self.inner.mailbox
3270 }
3271}
3272
3273impl<A: Actor> context::Mailbox for Context<'_, A> {
3274 fn mailbox(&self) -> &Mailbox {
3275 &self.instance.inner.mailbox
3276 }
3277}
3278
3279impl<A: Actor> context::Mailbox for &Instance<A> {
3280 fn mailbox(&self) -> &Mailbox {
3281 &self.inner.mailbox
3282 }
3283}
3284
3285impl<A: Actor> context::Mailbox for &Context<'_, A> {
3286 fn mailbox(&self) -> &Mailbox {
3287 &self.instance.inner.mailbox
3288 }
3289}
3290
3291impl<A: Actor> context::Actor for Instance<A> {
3292 type A = A;
3293 fn instance(&self) -> &Instance<A> {
3294 self
3295 }
3296}
3297
3298impl<A: Actor> context::Actor for Context<'_, A> {
3299 type A = A;
3300 fn instance(&self) -> &Instance<A> {
3301 self
3302 }
3303}
3304
3305impl<A: Actor> context::Actor for &Instance<A> {
3306 type A = A;
3307 fn instance(&self) -> &Instance<A> {
3308 self
3309 }
3310}
3311
3312impl<A: Actor> context::Actor for &Context<'_, A> {
3313 type A = A;
3314 fn instance(&self) -> &Instance<A> {
3315 self
3316 }
3317}
3318
3319impl<A, M> crate::Endpoint<M> for &Instance<A>
3320where
3321 A: Actor + Handler<M>,
3322 M: Message,
3323{
3324 fn endpoint_location(&self) -> crate::EndpointLocation {
3325 crate::EndpointLocation::Actor(self.self_addr().clone())
3326 }
3327
3328 fn post<C>(self, cx: &C, message: M)
3329 where
3330 C: context::Actor,
3331 {
3332 let port = self.port();
3333 crate::Endpoint::post(&port, cx, message)
3334 }
3335}
3336
3337impl<A, M> crate::Endpoint<M> for &Context<'_, A>
3338where
3339 A: Actor + Handler<M>,
3340 M: Message,
3341{
3342 fn endpoint_location(&self) -> crate::EndpointLocation {
3343 crate::EndpointLocation::Actor(self.self_addr().clone())
3344 }
3345
3346 fn post<C>(self, cx: &C, message: M)
3347 where
3348 C: context::Actor,
3349 {
3350 crate::Endpoint::post(self.instance, cx, message)
3351 }
3352}
3353
3354impl<A, M> crate::Endpoint<M> for Instance<A>
3355where
3356 A: Actor + Handler<M>,
3357 M: Message,
3358{
3359 fn endpoint_location(&self) -> crate::EndpointLocation {
3360 crate::EndpointLocation::Actor(self.self_addr().clone())
3361 }
3362
3363 fn post<C>(self, cx: &C, message: M)
3364 where
3365 C: context::Actor,
3366 {
3367 crate::Endpoint::post(&self, cx, message)
3368 }
3369}
3370
3371impl Instance<()> {
3372 pub fn bind_handler_port<M: RemoteMessage>(&self) -> (PortHandle<M>, PortReceiver<M>) {
3374 assert!(
3375 self.actor_task_handle().is_none(),
3376 "can only bind handler port on instance with no running actor task"
3377 );
3378 self.inner.mailbox.bind_handler_port()
3379 }
3380}
3381
3382#[derive(Debug)]
3383enum ActorType {
3384 Named(&'static TypeInfo),
3385 Anonymous(&'static str),
3386}
3387
3388impl ActorType {
3389 fn type_name(&self) -> &str {
3390 match self {
3391 ActorType::Named(info) => info.typename(),
3392 ActorType::Anonymous(name) => name,
3393 }
3394 }
3395}
3396
3397#[derive(Clone)]
3403pub struct InstanceCell {
3404 inner: Arc<InstanceCellState>,
3405}
3406
3407impl fmt::Debug for InstanceCell {
3408 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3409 f.debug_struct("InstanceCell")
3410 .field("actor_id", &self.inner.actor_id)
3411 .field("actor_type", &self.inner.actor_type)
3412 .finish()
3413 }
3414}
3415
3416struct InstanceCellState {
3417 actor_id: ActorAddr,
3419
3420 actor_type: ActorType,
3422
3423 proc: Proc,
3425
3426 actor_loop: Option<(
3428 PortHandle<Signal>,
3429 mpsc::UnboundedSender<ActorSupervisionEvent>,
3430 )>,
3431
3432 status: watch::Receiver<ActorStatus>,
3434
3435 parent: WeakInstanceCell,
3437
3438 children: DashMap<crate::id::Uid, InstanceCell>,
3440
3441 actor_task_handle: OnceLock<JoinHandle<()>>,
3443
3444 exported_named_ports: DashMap<u64, &'static str>,
3446
3447 num_processed_messages: AtomicU64,
3449
3450 created_at: SystemTime,
3452
3453 last_message_handler: RwLock<Option<HandlerInfo>>,
3455
3456 total_processing_time_us: AtomicU64,
3458
3459 queue_depth: Arc<AtomicU64>,
3470
3471 recording: Recording,
3474
3475 published_attrs: RwLock<Option<hyperactor_config::Attrs>>,
3484
3485 query_child_handler: RwLock<Option<Box<dyn (Fn(&Addr) -> IntrospectResult) + Send + Sync>>>,
3493
3494 supervision_event: std::sync::Mutex<Option<crate::supervision::ActorSupervisionEvent>>,
3497
3498 is_system: AtomicBool,
3502
3503 ports: Arc<dyn Any + Send + Sync>,
3506}
3507
3508impl InstanceCellState {
3509 fn maybe_unlink_parent(&self) -> Option<InstanceCell> {
3512 self.parent
3513 .upgrade()
3514 .filter(|parent| parent.inner.unlink(self))
3515 }
3516
3517 fn unlink(&self, child: &InstanceCellState) -> bool {
3519 assert_eq!(self.actor_id.proc_id(), child.actor_id.proc_id());
3520 self.children.remove(child.actor_id.uid()).is_some()
3521 }
3522}
3523
3524fn select_eviction_candidates(
3537 entries: &[(ActorAddr, Option<String>)],
3538 excess: usize,
3539) -> Vec<ActorAddr> {
3540 let mut clean: Vec<&ActorAddr> = Vec::new();
3541 let mut failed: Vec<(&ActorAddr, &str)> = Vec::new();
3542 for (id, occurred_at) in entries {
3543 match occurred_at {
3544 Some(ts) => failed.push((id, ts.as_str())),
3545 None => clean.push(id),
3546 }
3547 }
3548
3549 let mut to_remove: Vec<ActorAddr> = Vec::new();
3550 let mut remaining = excess;
3551
3552 for id in clean {
3554 if remaining == 0 {
3555 break;
3556 }
3557 to_remove.push(id.clone());
3558 remaining -= 1;
3559 }
3560
3561 if remaining > 0 {
3563 failed.sort_by(|a, b| b.1.cmp(a.1));
3564 for (id, _) in failed.into_iter().take(remaining) {
3565 to_remove.push(id.clone());
3566 }
3567 }
3568
3569 to_remove
3570}
3571
3572impl InstanceCell {
3573 fn new(
3576 actor_id: ActorAddr,
3577 actor_type: ActorType,
3578 proc: Proc,
3579 actor_loop: Option<(
3580 PortHandle<Signal>,
3581 mpsc::UnboundedSender<ActorSupervisionEvent>,
3582 )>,
3583 status: watch::Receiver<ActorStatus>,
3584 parent: Option<InstanceCell>,
3585 ports: Arc<dyn Any + Send + Sync>,
3586 queue_depth: Arc<AtomicU64>,
3587 ) -> Self {
3588 let _ais = actor_id.to_string();
3589 let cell = Self {
3590 inner: Arc::new(InstanceCellState {
3591 actor_id: actor_id.clone(),
3592 actor_type,
3593 proc: proc.clone(),
3594 actor_loop,
3595 status,
3596 parent: parent.map_or_else(WeakInstanceCell::new, |cell| cell.downgrade()),
3597 children: DashMap::new(),
3598 actor_task_handle: OnceLock::new(),
3599 exported_named_ports: DashMap::new(),
3600 num_processed_messages: AtomicU64::new(0),
3601 created_at: std::time::SystemTime::now(),
3602 last_message_handler: RwLock::new(None),
3603 total_processing_time_us: AtomicU64::new(0),
3604 queue_depth,
3605 recording: hyperactor_telemetry::recorder().record(64),
3606 published_attrs: RwLock::new(None),
3607 query_child_handler: RwLock::new(None),
3608 supervision_event: std::sync::Mutex::new(None),
3609 is_system: AtomicBool::new(false),
3610 ports,
3611 }),
3612 };
3613 cell.maybe_link_parent();
3614 proc.inner
3615 .instances
3616 .insert(actor_id.id().clone(), cell.downgrade());
3617 cell
3618 }
3619
3620 fn wrap(inner: Arc<InstanceCellState>) -> Self {
3621 Self { inner }
3622 }
3623
3624 pub fn actor_addr(&self) -> &ActorAddr {
3626 &self.inner.actor_id
3627 }
3628
3629 pub(crate) fn proc(&self) -> &Proc {
3631 &self.inner.proc
3632 }
3633
3634 pub(crate) fn uid(&self) -> &crate::id::Uid {
3636 self.inner.actor_id.uid()
3637 }
3638
3639 #[allow(dead_code)]
3641 pub(crate) fn actor_task_handle(&self) -> Option<&JoinHandle<()>> {
3642 self.inner.actor_task_handle.get()
3643 }
3644
3645 pub fn status(&self) -> &watch::Receiver<ActorStatus> {
3647 &self.inner.status
3648 }
3649
3650 pub fn supervision_event(&self) -> Option<crate::supervision::ActorSupervisionEvent> {
3653 self.inner.supervision_event.lock().unwrap().clone()
3654 }
3655
3656 fn signal_port(&self) -> PortHandle<Signal> {
3657 self.inner
3658 .actor_loop
3659 .as_ref()
3660 .map(|(signal_port, _)| signal_port.clone())
3661 .unwrap_or_else(|| panic!("{} has no runtime signal port", self.actor_addr()))
3662 }
3663
3664 pub fn signal(&self, signal: Signal) -> Result<(), ActorError> {
3666 if let Some((signal_port, _)) = &self.inner.actor_loop {
3667 static CLIENT: OnceLock<(Instance<()>, ActorHandle<()>)> = OnceLock::new();
3669 let client = &CLIENT
3670 .get_or_init(|| Proc::runtime().client("global_signal_client").unwrap())
3671 .0;
3672 signal_port.post(&client, signal);
3673 Ok(())
3674 } else {
3675 tracing::warn!(
3676 "{}: attempted to send signal {} to detached actor",
3677 self.inner.actor_id,
3678 signal
3679 );
3680 Ok(())
3681 }
3682 }
3683
3684 pub fn send_supervision_event_or_crash(&self, event: ActorSupervisionEvent) {
3693 match &self.inner.actor_loop {
3694 Some((_, supervision_tx)) => {
3695 if let Err(err) = supervision_tx.send(event.clone()) {
3696 if !event.is_error() {
3697 tracing::debug!(
3703 "{}: dropping non-error supervision event {}: {:?}",
3704 self.actor_addr(),
3705 event,
3706 err
3707 );
3708 return;
3709 }
3710 tracing::error!(
3711 "{}: failed to send supervision event to actor: {:?}. Crash the process.",
3712 self.actor_addr(),
3713 err
3714 );
3715 std::process::exit(1);
3716 }
3717 }
3718 None => {
3719 if !event.is_error() {
3720 tracing::debug!(
3721 "{}: dropping non-error supervision event {} to detached actor",
3722 self.actor_addr(),
3723 event,
3724 );
3725 return;
3726 }
3727 tracing::error!(
3728 "{}: failed: {}: cannot send supervision event to detached actor: crashing",
3729 self.actor_addr(),
3730 event,
3731 );
3732 std::process::exit(1);
3733 }
3734 }
3735 }
3736
3737 pub fn downgrade(&self) -> WeakInstanceCell {
3739 WeakInstanceCell {
3740 inner: Arc::downgrade(&self.inner),
3741 }
3742 }
3743
3744 fn link(&self, child: InstanceCell) {
3746 assert_eq!(self.actor_addr().proc_id(), child.actor_addr().proc_id());
3747 self.inner.children.insert(child.uid().clone(), child);
3748 }
3749
3750 fn unlink(&self, child: &InstanceCell) {
3752 assert_eq!(self.actor_addr().proc_id(), child.actor_addr().proc_id());
3753 self.inner.children.remove(child.uid());
3754 }
3755
3756 fn unlink_all(&self) {
3758 self.inner.children.clear();
3759 }
3760
3761 fn maybe_link_parent(&self) {
3763 if let Some(parent) = self.inner.parent.upgrade() {
3764 parent.link(self.clone());
3765 }
3766 }
3767
3768 fn maybe_unlink_parent(&self) -> Option<InstanceCell> {
3771 self.inner.maybe_unlink_parent()
3772 }
3773
3774 fn child_iter(&self) -> impl Iterator<Item = RefMulti<'_, crate::id::Uid, InstanceCell>> {
3777 self.inner.children.iter()
3778 }
3779
3780 pub fn child_count(&self) -> usize {
3782 self.inner.children.len()
3783 }
3784
3785 pub fn child_actor_ids(&self) -> Vec<ActorAddr> {
3787 self.inner
3788 .children
3789 .iter()
3790 .map(|entry| entry.value().actor_addr().clone())
3791 .collect()
3792 }
3793
3794 fn get_child(&self, uid: &crate::id::Uid) -> Option<InstanceCell> {
3796 self.inner.children.get(uid).map(|child| child.clone())
3797 }
3798
3799 pub fn recording(&self) -> &Recording {
3801 &self.inner.recording
3802 }
3803
3804 pub fn created_at(&self) -> SystemTime {
3806 self.inner.created_at
3807 }
3808
3809 pub fn num_processed_messages(&self) -> u64 {
3811 self.inner.num_processed_messages.load(Ordering::SeqCst)
3812 }
3813
3814 pub fn last_message_handler(&self) -> Option<HandlerInfo> {
3816 self.inner.last_message_handler.read().unwrap().clone()
3817 }
3818
3819 pub fn total_processing_time_us(&self) -> u64 {
3821 self.inner.total_processing_time_us.load(Ordering::SeqCst)
3822 }
3823
3824 pub fn queue_depth(&self) -> u64 {
3826 self.inner.queue_depth.load(Ordering::Relaxed)
3827 }
3828
3829 pub fn parent(&self) -> Option<InstanceCell> {
3831 self.inner.parent.upgrade()
3832 }
3833
3834 pub fn actor_type_name(&self) -> &str {
3836 self.inner.actor_type.type_name()
3837 }
3838
3839 pub fn set_published_attrs(&self, attrs: hyperactor_config::Attrs) {
3841 *self.inner.published_attrs.write().unwrap() = Some(attrs);
3842 }
3843
3844 pub fn merge_published_attr<T: hyperactor_config::AttrValue>(
3847 &self,
3848 key: hyperactor_config::Key<T>,
3849 value: T,
3850 ) {
3851 self.inner
3852 .published_attrs
3853 .write()
3854 .unwrap()
3855 .get_or_insert_with(hyperactor_config::Attrs::new)
3856 .set(key, value);
3857 }
3858
3859 pub fn published_attrs(&self) -> Option<hyperactor_config::Attrs> {
3861 self.inner.published_attrs.read().unwrap().clone()
3862 }
3863
3864 pub fn set_query_child_handler(
3872 &self,
3873 handler: impl (Fn(&Addr) -> IntrospectResult) + Send + Sync + 'static,
3874 ) {
3875 *self.inner.query_child_handler.write().unwrap() = Some(Box::new(handler));
3876 }
3877
3878 pub fn query_child(&self, child_ref: &Addr) -> Option<IntrospectResult> {
3880 let guard = self.inner.query_child_handler.read().unwrap();
3881 guard.as_ref().map(|handler| handler(child_ref))
3882 }
3883
3884 pub fn is_system(&self) -> bool {
3886 self.inner.is_system.load(Ordering::Relaxed)
3887 }
3888
3889 pub fn store_terminated_snapshot(&self, payload: crate::introspect::IntrospectResult) {
3899 let snapshots = &self.inner.proc.inner.terminated_snapshots;
3900 snapshots.insert(
3901 self.actor_addr().id().clone(),
3902 TerminatedSnapshot {
3903 actor_addr: self.actor_addr().clone(),
3904 payload,
3905 },
3906 );
3907 let max = hyperactor_config::global::get(crate::config::TERMINATED_SNAPSHOT_RETENTION);
3908 let excess = snapshots.len().saturating_sub(max);
3909 if excess > 0 {
3910 let entries: Vec<_> = snapshots
3912 .iter()
3913 .map(|entry| {
3914 let occurred_at = serde_json::from_str::<hyperactor_config::Attrs>(
3915 &entry.value().payload.attrs,
3916 )
3917 .ok()
3918 .and_then(|attrs| {
3919 attrs
3921 .get(crate::introspect::FAILURE_ERROR_MESSAGE)
3922 .cloned()?;
3923 attrs
3925 .get(crate::introspect::FAILURE_OCCURRED_AT)
3926 .map(|t| humantime::format_rfc3339(*t).to_string())
3927 });
3928 (entry.value().actor_addr.clone(), occurred_at)
3929 })
3930 .collect();
3931
3932 for key in select_eviction_candidates(&entries, excess) {
3933 snapshots.remove(key.id());
3934 }
3935 }
3936 }
3937
3938 pub(crate) fn bind<A: Actor, R: Binds<A>>(&self, ports: &HandlerPorts<A>) -> ActorRef<R> {
3941 <R as Binds<A>>::bind(ports);
3942 ports.bind::<Undeliverable<MessageEnvelope>>();
3954 for entry in ports.bound.iter() {
3956 self.inner
3957 .exported_named_ports
3958 .insert(*entry.key(), entry.value());
3959 }
3960 ActorRef::attest(ActorAddr::new(
3961 self.actor_addr().id().clone(),
3962 self.inner.proc.default_location(),
3963 ))
3964 }
3965
3966 pub(crate) fn downcast_handle<A: Actor>(&self) -> Option<ActorHandle<A>> {
3968 let ports = Arc::clone(&self.inner.ports)
3969 .downcast::<HandlerPorts<A>>()
3970 .ok()?;
3971 Some(ActorHandle::new(self.clone(), ports))
3972 }
3973
3974 pub fn traverse<F>(&self, f: &mut F)
3978 where
3979 F: FnMut(&InstanceCell, usize),
3980 {
3981 self.traverse_inner(0, f);
3982 }
3983
3984 fn traverse_inner<F>(&self, depth: usize, f: &mut F)
3985 where
3986 F: FnMut(&InstanceCell, usize),
3987 {
3988 f(self, depth);
3989 let mut children: Vec<_> = self.child_iter().map(|r| r.value().clone()).collect();
3991 children.sort_by_key(|c| c.uid().clone());
3992 for child in children {
3993 child.traverse_inner(depth + 1, f);
3994 }
3995 }
3996}
3997
3998impl Drop for InstanceCellState {
3999 fn drop(&mut self) {
4000 if let Some(parent) = self.maybe_unlink_parent() {
4001 tracing::debug!(
4002 "instance {} was dropped with parent {} still linked",
4003 self.actor_id,
4004 parent.actor_addr()
4005 );
4006 }
4007 if self
4008 .proc
4009 .inner
4010 .instances
4011 .remove(self.actor_id.id())
4012 .is_none()
4013 {
4014 tracing::error!("instance {} was dropped but not in proc", self.actor_id);
4015 }
4016 }
4017}
4018
4019#[derive(Debug, Clone)]
4022pub struct WeakInstanceCell {
4023 inner: Weak<InstanceCellState>,
4024}
4025
4026impl Default for WeakInstanceCell {
4027 fn default() -> Self {
4028 Self::new()
4029 }
4030}
4031
4032impl WeakInstanceCell {
4033 pub fn new() -> Self {
4035 Self { inner: Weak::new() }
4036 }
4037
4038 pub fn upgrade(&self) -> Option<InstanceCell> {
4040 self.inner.upgrade().map(InstanceCell::wrap)
4041 }
4042}
4043
4044pub struct HandlerPorts<A: Actor> {
4049 ports: DashMap<TypeId, Box<dyn Any + Send + Sync + 'static>>,
4050 bound: DashMap<u64, &'static str>,
4051 mailbox: Mailbox,
4052 workq: OrderedSender<WorkCell<A>>,
4053 queue_depth: Arc<AtomicU64>,
4055 proc_stats: Arc<ProcQueueStats>,
4057}
4058
4059impl<A: Actor> HandlerPorts<A> {
4060 fn new(
4061 mailbox: Mailbox,
4062 workq: OrderedSender<WorkCell<A>>,
4063 queue_depth: Arc<AtomicU64>,
4064 proc_stats: Arc<ProcQueueStats>,
4065 ) -> Self {
4066 Self {
4067 ports: DashMap::new(),
4068 bound: DashMap::new(),
4069 mailbox,
4070 workq,
4071 queue_depth,
4072 proc_stats,
4073 }
4074 }
4075
4076 pub(crate) fn get<M: Message>(&self) -> PortHandle<M>
4078 where
4079 A: Handler<M>,
4080 {
4081 let key = TypeId::of::<M>();
4082 match self.ports.entry(key) {
4083 Entry::Vacant(entry) => {
4084 assert!(
4088 !crate::ordering::is_bypass_workq_type_id(key),
4089 "cannot provision bypass-workq port {} through `Ports::get`; \
4090 it must be pre-registered via `open_message_port` in `Instance::new`",
4091 std::any::type_name::<M>()
4092 );
4093
4094 let type_info = TypeInfo::get_by_typeid(key);
4095 let workq = self.workq.clone();
4096 let actor_id = self.mailbox.actor_addr().to_string();
4097 let enqueue_depth = Arc::clone(&self.queue_depth);
4098 let enqueue_proc_stats = Arc::clone(&self.proc_stats);
4099 let enqueue = move |headers: Flattrs, msg: M| {
4110 let seq_info = headers.get(SEQ_INFO);
4111
4112 let work = WorkCell::new(move |actor: &mut A, instance: &Instance<A>| {
4113 Box::pin(async move {
4114 unsafe {
4116 instance
4117 .handle_message(actor, type_info, headers, msg)
4118 .await
4119 }
4120 })
4121 });
4122 account_enqueue(&enqueue_depth, &enqueue_proc_stats, &actor_id);
4129 let result = if workq.enable_buffering {
4130 match seq_info {
4131 Some(SeqInfo::Session { session_id, seq }) => {
4132 workq.send(session_id, seq, work).map_err(|e| match e {
4135 OrderedSenderError::InvalidZeroSeq(_) => {
4136 let error_msg = format!(
4137 "in enqueue func for {}, got seq 0 for message type {}",
4138 actor_id,
4139 std::any::type_name::<M>(),
4140 );
4141 tracing::error!(error_msg);
4142 anyhow::anyhow!(error_msg)
4143 }
4144 OrderedSenderError::SendError(e) => anyhow::Error::from(e),
4145 OrderedSenderError::FlushError(e) => e,
4146 })
4147 }
4148 Some(SeqInfo::Direct) => {
4149 workq.direct_send(work).map_err(anyhow::Error::from)
4150 }
4151 None => {
4152 let error_msg = format!(
4153 "in enqueue func for {}, buffering is enabled, but SEQ_INFO is not set for message type {}",
4154 actor_id,
4155 std::any::type_name::<M>(),
4156 );
4157 tracing::error!(error_msg);
4158 Err(anyhow::anyhow!(error_msg))
4159 }
4160 }
4161 } else {
4162 workq.direct_send(work).map_err(anyhow::Error::from)
4163 };
4164 if result.is_err() {
4165 account_cancel_enqueue(&enqueue_depth, &enqueue_proc_stats, &actor_id);
4166 }
4167 result
4168 };
4169 let port = self.mailbox.open_handler_enqueue_port(enqueue);
4170 entry.insert(Box::new(port.clone()));
4171 port
4172 }
4173 Entry::Occupied(entry) => {
4174 let port = entry.get();
4175 port.downcast_ref::<PortHandle<M>>().unwrap().clone()
4176 }
4177 }
4178 }
4179
4180 pub fn bind<M: RemoteMessage>(&self)
4182 where
4183 A: Handler<M>,
4184 {
4185 let port_index = M::port();
4186 match self.bound.entry(port_index) {
4187 Entry::Vacant(entry) => {
4188 self.get::<M>().bind_handler_port();
4189 entry.insert(M::typename());
4190 }
4191 Entry::Occupied(entry) => {
4192 assert_eq!(
4193 *entry.get(),
4194 M::typename(),
4195 "bind {}: port index {} already bound to type {}",
4196 M::typename(),
4197 port_index,
4198 entry.get(),
4199 );
4200 }
4201 }
4202 }
4203}
4204
4205#[cfg(test)]
4206mod tests {
4207 use std::assert_matches;
4208 use std::sync::atomic::AtomicBool;
4209
4210 use hyperactor_macros::export;
4211 use serde_json::json;
4212 use timed_test::async_timed_test;
4213 use tokio::sync::Barrier;
4214 use tokio::sync::oneshot;
4215 use tracing::Level;
4216 use tracing_subscriber::layer::SubscriberExt;
4217 use tracing_test::internal::logs_with_scope_contain;
4218
4219 use super::*;
4220 use crate as hyperactor;
4222 use crate::HandleClient;
4223 use crate::Handler;
4224 use crate::OncePortRef;
4225 use crate::PortRef;
4226 use crate::port::Port;
4227 use crate::testing::proc_supervison::ProcSupervisionCoordinator;
4228 use crate::testing::process_assertion::assert_termination;
4229
4230 #[derive(Debug, Default)]
4231 #[export]
4232 struct TestActor;
4233
4234 impl Actor for TestActor {}
4235
4236 #[derive(Debug)]
4237 struct DelayedSelfActor {
4238 ready: Option<OncePortRef<()>>,
4239 fired: Option<OncePortRef<()>>,
4240 delay: Duration,
4241 }
4242
4243 #[derive(Debug)]
4244 struct DelayedSelfTick;
4245
4246 #[async_trait]
4247 impl Actor for DelayedSelfActor {
4248 async fn init(&mut self, this: &Instance<Self>) -> anyhow::Result<()> {
4249 if let Some(ready) = self.ready.take() {
4250 ready.post(this, ());
4251 }
4252 this.post_after(this, DelayedSelfTick, self.delay);
4253 Ok(())
4254 }
4255 }
4256
4257 #[async_trait]
4258 impl Handler<DelayedSelfTick> for DelayedSelfActor {
4259 async fn handle(
4260 &mut self,
4261 cx: &crate::Context<Self>,
4262 _message: DelayedSelfTick,
4263 ) -> anyhow::Result<()> {
4264 if let Some(fired) = self.fired.take() {
4265 fired.post(cx, ());
4266 }
4267 Ok(())
4268 }
4269 }
4270
4271 #[derive(Debug)]
4272 struct DelayedPortActor {
4273 reply: Option<PortRef<u64>>,
4274 delay: Duration,
4275 }
4276
4277 #[async_trait]
4278 impl Actor for DelayedPortActor {
4279 async fn init(&mut self, this: &Instance<Self>) -> anyhow::Result<()> {
4280 this.post_after(
4281 self.reply.take().expect("reply port should be present"),
4282 123u64,
4283 self.delay,
4284 );
4285 Ok(())
4286 }
4287 }
4288
4289 #[derive(Handler, HandleClient, Debug)]
4290 enum TestActorMessage {
4291 Reply(oneshot::Sender<()>),
4292 Wait(oneshot::Sender<()>, oneshot::Receiver<()>),
4293 Forward(ActorHandle<TestActor>, Box<TestActorMessage>),
4294 Noop(),
4295 Fail(anyhow::Error),
4296 Panic(String),
4297 Spawn(oneshot::Sender<ActorHandle<TestActor>>),
4298 }
4299
4300 impl TestActor {
4301 async fn spawn_child(
4302 cx: &impl context::Actor,
4303 parent: &ActorHandle<TestActor>,
4304 ) -> ActorHandle<TestActor> {
4305 let (tx, rx) = oneshot::channel();
4306 parent.post(cx, TestActorMessage::Spawn(tx));
4307 rx.await.unwrap()
4308 }
4309 }
4310
4311 #[test]
4312 fn test_proc_identity_constructors() {
4313 let anonymous = Proc::anonymous();
4314 assert!(
4315 matches!(anonymous.proc_id().uid(), crate::id::Uid::Instance(_, None)),
4316 "anonymous proc must have an unlabeled instance id"
4317 );
4318 assert_eq!(anonymous.proc_id().label(), None);
4319
4320 let instance = Proc::instance("worker");
4321 assert!(
4322 matches!(
4323 instance.proc_id().uid(),
4324 crate::id::Uid::Instance(_, Some(label)) if label.as_str() == "worker"
4325 ),
4326 "instance proc must have a labeled instance id"
4327 );
4328 assert_eq!(
4329 instance.proc_id().label().map(|label| label.as_str()),
4330 Some("worker")
4331 );
4332
4333 let singleton = Proc::singleton("controller");
4334 assert!(
4335 matches!(
4336 singleton.proc_id().uid(),
4337 crate::id::Uid::Singleton(label) if label.as_str() == "controller"
4338 ),
4339 "singleton proc must have a singleton id"
4340 );
4341 assert_eq!(
4342 singleton.proc_id().label().map(|label| label.as_str()),
4343 Some("controller")
4344 );
4345 }
4346
4347 #[async_trait]
4348 #[crate::handle(TestActorMessage)]
4349 impl TestActorMessageHandler for TestActor {
4350 async fn reply(
4351 &mut self,
4352 _cx: &crate::Context<Self>,
4353 sender: oneshot::Sender<()>,
4354 ) -> Result<(), anyhow::Error> {
4355 sender.send(()).unwrap();
4356 Ok(())
4357 }
4358
4359 async fn wait(
4360 &mut self,
4361 _cx: &crate::Context<Self>,
4362 sender: oneshot::Sender<()>,
4363 receiver: oneshot::Receiver<()>,
4364 ) -> Result<(), anyhow::Error> {
4365 sender.send(()).unwrap();
4366 receiver.await.unwrap();
4367 Ok(())
4368 }
4369
4370 async fn forward(
4371 &mut self,
4372 cx: &crate::Context<Self>,
4373 destination: ActorHandle<TestActor>,
4374 message: Box<TestActorMessage>,
4375 ) -> Result<(), anyhow::Error> {
4376 destination.post(cx, *message);
4378 Ok(())
4379 }
4380
4381 async fn noop(&mut self, _cx: &crate::Context<Self>) -> Result<(), anyhow::Error> {
4382 Ok(())
4383 }
4384
4385 async fn fail(
4386 &mut self,
4387 _cx: &crate::Context<Self>,
4388 err: anyhow::Error,
4389 ) -> Result<(), anyhow::Error> {
4390 Err(err)
4391 }
4392
4393 async fn panic(
4394 &mut self,
4395 _cx: &crate::Context<Self>,
4396 err_msg: String,
4397 ) -> Result<(), anyhow::Error> {
4398 panic!("{}", err_msg);
4399 }
4400
4401 async fn spawn(
4402 &mut self,
4403 cx: &crate::Context<Self>,
4404 reply: oneshot::Sender<ActorHandle<TestActor>>,
4405 ) -> Result<(), anyhow::Error> {
4406 let handle = TestActor.spawn(cx)?;
4407 reply.send(handle).unwrap();
4408 Ok(())
4409 }
4410 }
4411
4412 #[tokio::test]
4413 async fn test_client_instance_can_bind_signal_port() {
4414 let proc = Proc::isolated();
4415 let (client, _) = proc.client("client").unwrap();
4416
4417 let (_signal_port, _signal_rx) = client.bind_handler_port::<Signal>();
4418 }
4419
4420 #[expect(
4421 clippy::await_holding_invalid_type,
4422 reason = "tracing_test::traced_test macro expansion holds tracing::span::Entered across awaits; can't be fixed in our code"
4423 )]
4424 #[tracing_test::traced_test]
4425 #[async_timed_test(timeout_secs = 30)]
4426 async fn test_spawn_actor() {
4427 let proc = Proc::isolated();
4428 let (client, _) = proc.client("client").unwrap();
4429 let handle = proc.spawn("test", TestActor).unwrap();
4430
4431 assert!(logs_contain(
4433 format!(
4434 "{}: spawned with {:?}",
4435 handle.actor_addr(),
4436 handle.cell().actor_task_handle().unwrap(),
4437 )
4438 .as_str()
4439 ));
4440
4441 let mut state = handle.status().clone();
4442
4443 let (tx, rx) = oneshot::channel::<()>();
4446 handle.post(&client, TestActorMessage::Reply(tx));
4447 rx.await.unwrap();
4448
4449 state
4450 .wait_for(|state: &ActorStatus| matches!(*state, ActorStatus::Idle))
4451 .await
4452 .unwrap();
4453
4454 let (enter_tx, enter_rx) = oneshot::channel::<()>();
4456 let (exit_tx, exit_rx) = oneshot::channel::<()>();
4457
4458 handle.post(&client, TestActorMessage::Wait(enter_tx, exit_rx));
4459 enter_rx.await.unwrap();
4460 assert_matches!(*state.borrow(), ActorStatus::Processing(instant, _) if instant <= std::time::SystemTime::now());
4461 exit_tx.send(()).unwrap();
4462
4463 state
4464 .wait_for(|state| matches!(*state, ActorStatus::Idle))
4465 .await
4466 .unwrap();
4467
4468 handle.drain_and_stop("test").unwrap();
4469 handle.await;
4470 assert_matches!(&*state.borrow(), ActorStatus::Stopped(reason) if reason == "test");
4471 }
4472
4473 #[async_timed_test(timeout_secs = 30)]
4474 async fn test_proc_actors_messaging() {
4475 let proc = Proc::isolated();
4476 let (client, _) = proc.client("client").unwrap();
4477 let first = proc.spawn::<TestActor>("first", TestActor).unwrap();
4478 let second = proc.spawn::<TestActor>("second", TestActor).unwrap();
4479 let (tx, rx) = oneshot::channel::<()>();
4480 let reply_message = TestActorMessage::Reply(tx);
4481 first.post(
4482 &client,
4483 TestActorMessage::Forward(second, Box::new(reply_message)),
4484 );
4485 rx.await.unwrap();
4486 }
4487
4488 #[tokio::test]
4493 async fn test_post_routes_by_proc_id() {
4494 use crate::mailbox::monitored_return_handle;
4495 use crate::testing::ids::test_actor_id;
4496
4497 #[derive(Clone)]
4498 struct CountingSender(Arc<AtomicUsize>);
4499
4500 #[async_trait]
4501 impl MailboxSender for CountingSender {
4502 fn post_unchecked(
4503 &self,
4504 _envelope: MessageEnvelope,
4505 _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
4506 ) {
4507 self.0.fetch_add(1, Ordering::SeqCst);
4508 }
4509 }
4510
4511 let local_addr = ChannelAddr::Local(1);
4514 let remote_addr = ChannelAddr::Local(2);
4515
4516 let proc_local = ProcAddr::instance(local_addr.clone(), "shared");
4517 let proc_same_id_other_location =
4518 ProcAddr::new(proc_local.id().clone(), remote_addr.into());
4519 let proc_other_id_same_location = ProcAddr::instance(local_addr, "other");
4520 assert_eq!(
4521 proc_local.id(),
4522 proc_same_id_other_location.id(),
4523 "test setup: both procs must share a ProcId"
4524 );
4525 assert_ne!(
4526 proc_local.id(),
4527 proc_other_id_same_location.id(),
4528 "test setup: the remote proc must have a distinct ProcId"
4529 );
4530
4531 let forwarded = Arc::new(AtomicUsize::new(0));
4532 let proc = Proc::configured(
4533 proc_local.clone(),
4534 BoxedMailboxSender::new(CountingSender(forwarded.clone())),
4535 );
4536 let sender = test_actor_id("sender", "client");
4537
4538 let local_dest = proc_local.actor_addr("worker").port_addr(Port::from(1234));
4540 proc.post(
4541 MessageEnvelope::new(
4542 sender.clone(),
4543 local_dest,
4544 wirevalue::Any::serialize(&1u64).unwrap(),
4545 Flattrs::new(),
4546 ),
4547 monitored_return_handle(),
4548 );
4549 assert_eq!(forwarded.load(Ordering::SeqCst), 0);
4550
4551 let same_id_other_location_dest = proc_same_id_other_location
4553 .actor_addr("worker")
4554 .port_addr(Port::from(1234));
4555 proc.post(
4556 MessageEnvelope::new(
4557 sender.clone(),
4558 same_id_other_location_dest,
4559 wirevalue::Any::serialize(&1u64).unwrap(),
4560 Flattrs::new(),
4561 ),
4562 monitored_return_handle(),
4563 );
4564 assert_eq!(forwarded.load(Ordering::SeqCst), 0);
4565
4566 let other_id_same_location_dest = proc_other_id_same_location
4568 .actor_addr("worker")
4569 .port_addr(Port::from(1234));
4570 proc.post(
4571 MessageEnvelope::new(
4572 sender,
4573 other_id_same_location_dest,
4574 wirevalue::Any::serialize(&1u64).unwrap(),
4575 Flattrs::new(),
4576 ),
4577 monitored_return_handle(),
4578 );
4579 assert_eq!(forwarded.load(Ordering::SeqCst), 1);
4580 }
4581
4582 #[test]
4583 fn test_local_delivery_service_and_local_compare_full_proc_addr() {
4584 for name in [LEGACY_SERVICE_PROC_NAME, LEGACY_LOCAL_PROC_NAME] {
4585 let local = ProcAddr::singleton(ChannelAddr::Local(1), name);
4586 let same_id_other_location = ProcAddr::singleton(ChannelAddr::Local(2), name);
4587 let proc = match name {
4588 LEGACY_SERVICE_PROC_NAME => Proc::legacy_service_pseudo_singleton(
4589 ChannelAddr::Local(1),
4590 BoxedMailboxSender::new(PanickingMailboxSender),
4591 ),
4592 LEGACY_LOCAL_PROC_NAME => Proc::legacy_local_pseudo_singleton(
4593 ChannelAddr::Local(1),
4594 BoxedMailboxSender::new(PanickingMailboxSender),
4595 ),
4596 _ => unreachable!("test only covers legacy pseudo-singletons"),
4597 };
4598
4599 assert_eq!(local.id(), same_id_other_location.id());
4600 assert!(proc.is_local_delivery_target(&local));
4601 assert!(!proc.is_local_delivery_target(&same_id_other_location));
4602 }
4603
4604 let shared = ProcAddr::singleton(ChannelAddr::Local(1), "shared");
4605 let shared_other_location = ProcAddr::singleton(ChannelAddr::Local(2), "shared");
4606 let proc = Proc::configured(
4607 shared.clone(),
4608 BoxedMailboxSender::new(PanickingMailboxSender),
4609 );
4610 assert!(proc.is_local_delivery_target(&shared_other_location));
4611
4612 let service_instance = ProcAddr::instance(ChannelAddr::Local(1), "service");
4613 let service_instance_other_location =
4614 ProcAddr::new(service_instance.id().clone(), ChannelAddr::Local(2).into());
4615 let proc = Proc::configured(
4616 service_instance,
4617 BoxedMailboxSender::new(PanickingMailboxSender),
4618 );
4619 assert!(proc.is_local_delivery_target(&service_instance_other_location));
4620 }
4621
4622 #[test]
4623 fn test_legacy_pseudo_singletons_use_dedicated_constructors() {
4624 for name in [LEGACY_SERVICE_PROC_NAME, LEGACY_LOCAL_PROC_NAME] {
4625 let result = std::panic::catch_unwind(|| {
4626 Proc::configured(
4627 ProcAddr::singleton(ChannelAddr::Local(1), name),
4628 BoxedMailboxSender::new(PanickingMailboxSender),
4629 );
4630 });
4631 assert!(result.is_err());
4632 }
4633
4634 let service = Proc::legacy_service_pseudo_singleton(
4635 ChannelAddr::Local(1),
4636 BoxedMailboxSender::new(PanickingMailboxSender),
4637 );
4638 assert_eq!(
4639 service.proc_addr().id().uid().to_string(),
4640 LEGACY_SERVICE_PROC_NAME
4641 );
4642
4643 let local = Proc::legacy_local_pseudo_singleton(
4644 ChannelAddr::Local(2),
4645 BoxedMailboxSender::new(PanickingMailboxSender),
4646 );
4647 assert_eq!(
4648 local.proc_addr().id().uid().to_string(),
4649 LEGACY_LOCAL_PROC_NAME
4650 );
4651 }
4652
4653 #[tokio::test]
4654 async fn test_mailbox_muxer_delivers_by_actor_id() {
4655 use crate::mailbox::PortLocation;
4656 use crate::mailbox::monitored_return_handle;
4657 use crate::testing::ids::test_actor_id;
4658
4659 let proc = Proc::isolated();
4660 let (instance, _) = proc.client("worker").unwrap();
4661 let (port, mut receiver) = instance.bind_handler_port::<u64>();
4662
4663 let PortLocation::Bound(default_dest) = port.location() else {
4664 panic!("actor port must be bound");
4665 };
4666 let alternate_dest =
4667 PortAddr::new(default_dest.id().clone(), ChannelAddr::Local(9876).into());
4668
4669 proc.post(
4670 MessageEnvelope::serialize(
4671 test_actor_id("sender", "client"),
4672 alternate_dest,
4673 &123u64,
4674 Flattrs::new(),
4675 )
4676 .unwrap(),
4677 monitored_return_handle(),
4678 );
4679
4680 assert_eq!(receiver.recv().await.unwrap(), 123);
4681 }
4682
4683 #[test]
4684 fn test_default_location_changes_new_bindings_not_lookup() {
4685 let proc = Proc::isolated();
4686 let gateway = proc.gateway();
4687 let (_instance, handle) = proc.client("worker").unwrap();
4688
4689 let first_ref: ActorRef<()> = handle.bind();
4690 let new_location = ChannelAddr::Local(9876).into();
4691 gateway.set_default_location(new_location);
4692 let second_ref: ActorRef<()> = handle.bind();
4693
4694 assert_eq!(first_ref.actor_addr().id(), second_ref.actor_addr().id());
4695 assert_ne!(
4696 first_ref.actor_addr().location(),
4697 second_ref.actor_addr().location()
4698 );
4699 assert_eq!(second_ref.actor_addr().location(), &proc.default_location());
4700 assert_eq!(proc.default_location(), gateway.default_location());
4701 assert_eq!(proc.proc_addr(), gateway.proc_addr(proc.proc_id()));
4702 assert!(proc.get_instance(second_ref.actor_addr()).is_some());
4703 }
4704
4705 #[test]
4706 fn test_builder_procs_can_share_gateway_with_distinct_ids() {
4707 let gateway = Gateway::new();
4708 let first = Proc::builder()
4709 .proc_id(ProcId::instance(Label::strip("first")))
4710 .shared_gateway(gateway.clone())
4711 .build()
4712 .unwrap();
4713 let second = Proc::builder()
4714 .proc_id(ProcId::instance(Label::strip("second")))
4715 .shared_gateway(gateway.clone())
4716 .build()
4717 .unwrap();
4718
4719 assert_ne!(first.proc_id(), second.proc_id());
4720 assert_eq!(first.default_location(), second.default_location());
4721
4722 let new_location = ChannelAddr::Local(9876).into();
4723 gateway.set_default_location(new_location);
4724
4725 assert_eq!(first.default_location(), gateway.default_location());
4726 assert_eq!(second.default_location(), gateway.default_location());
4727 assert_eq!(first.proc_addr(), gateway.proc_addr(first.proc_id()));
4728 assert_eq!(second.proc_addr(), gateway.proc_addr(second.proc_id()));
4729 }
4730
4731 #[test]
4732 fn test_isolated_procs_use_distinct_gateways() {
4733 let first = Proc::isolated();
4734 let second = Proc::isolated();
4735 let second_location = second.default_location();
4736
4737 first
4738 .gateway()
4739 .set_default_location(ChannelAddr::Local(9876).into());
4740
4741 assert_ne!(first.proc_id(), second.proc_id());
4742 assert_ne!(first.default_location(), second_location);
4743 assert_eq!(second.default_location(), second_location);
4744 }
4745
4746 #[tokio::test]
4747 async fn test_gateway_serve_updates_location_and_stops() {
4748 use crate::mailbox::PortLocation;
4749 use crate::mailbox::monitored_return_handle;
4750 use crate::testing::ids::test_actor_id;
4751
4752 let proc = Proc::isolated();
4753 let gateway = proc.gateway();
4754 let initial_location = proc.default_location();
4755 let (client, _) = proc.client("client").unwrap();
4756 let (port, mut receiver) = client.bind_handler_port::<u64>();
4757 let PortLocation::Bound(default_dest) = port.location() else {
4758 panic!("handler port must be bound");
4759 };
4760
4761 async fn send_to_location(
4762 location: Location,
4763 default_dest: &PortAddr,
4764 value: u64,
4765 receiver: &mut PortReceiver<u64>,
4766 ) {
4767 let dest = PortAddr::new(default_dest.id().clone(), location.clone());
4768 let sender = MailboxClient::dial(location.addr().clone()).unwrap();
4769 sender.post(
4770 MessageEnvelope::serialize(
4771 test_actor_id("sender", "client"),
4772 dest,
4773 &value,
4774 Flattrs::new(),
4775 )
4776 .unwrap(),
4777 monitored_return_handle(),
4778 );
4779 sender.flush().await.unwrap();
4780 let received = tokio::time::timeout(Duration::from_secs(5), receiver.recv())
4781 .await
4782 .unwrap()
4783 .unwrap();
4784 assert_eq!(received, value);
4785 }
4786
4787 let server = Gateway::serve(&gateway, ChannelAddr::any(ChannelTransport::Local)).unwrap();
4788
4789 assert_eq!(proc.default_location(), initial_location);
4790 assert_eq!(proc.default_location(), gateway.default_location());
4791 assert_eq!(proc.proc_addr(), gateway.proc_addr(proc.proc_id()));
4792 send_to_location(initial_location.clone(), &default_dest, 1, &mut receiver).await;
4793
4794 let next_server =
4795 Gateway::serve(&gateway, ChannelAddr::any(ChannelTransport::Local)).unwrap();
4796 let next_location = proc.default_location();
4797
4798 assert_ne!(proc.default_location(), initial_location);
4799 assert_eq!(proc.default_location(), gateway.default_location());
4800 assert_eq!(proc.proc_addr(), gateway.proc_addr(proc.proc_id()));
4801 send_to_location(next_location.clone(), &default_dest, 2, &mut receiver).await;
4802 send_to_location(initial_location.clone(), &default_dest, 3, &mut receiver).await;
4803
4804 next_server.stop("test complete");
4805 next_server.await.unwrap().unwrap();
4806
4807 assert_eq!(proc.default_location(), initial_location);
4808 assert_eq!(proc.default_location(), gateway.default_location());
4809 assert!(MailboxClient::dial(next_location.addr().clone()).is_err());
4810 send_to_location(initial_location.clone(), &default_dest, 4, &mut receiver).await;
4811
4812 server.stop("test complete");
4813 server.await.unwrap().unwrap();
4814
4815 assert_eq!(proc.default_location(), initial_location);
4816 assert_eq!(proc.default_location(), gateway.default_location());
4817 assert!(MailboxClient::dial(initial_location.addr().clone()).is_err());
4818 }
4819
4820 #[tokio::test]
4821 async fn test_direct_proc_server_stops_via_join_mailbox_server() {
4822 let proc = Proc::direct(
4823 ChannelAddr::any(ChannelTransport::Local),
4824 "direct".to_string(),
4825 )
4826 .unwrap();
4827
4828 assert_eq!(proc.proc_addr(), proc.gateway().proc_addr(proc.proc_id()));
4829
4830 proc.join_mailbox_server().await;
4831 }
4832
4833 #[tokio::test]
4834 async fn test_local_only_gateway_returns_undeliverable_messages() {
4835 use crate::testing::ids::test_actor_id;
4836
4837 let proc = Proc::isolated();
4838 let (client, _) = proc.client("client").unwrap();
4839 let (return_handle, mut undeliverable_rx) =
4840 client.open_port::<Undeliverable<MessageEnvelope>>();
4841 let remote_proc = ProcAddr::instance(ChannelAddr::Local(1234), "remote");
4842 let remote_dest = remote_proc.actor_addr("worker").port_addr(Port::from(0));
4843
4844 proc.post(
4845 MessageEnvelope::serialize(
4846 test_actor_id("sender", "client"),
4847 remote_dest.clone(),
4848 &123u64,
4849 Flattrs::new(),
4850 )
4851 .unwrap(),
4852 return_handle,
4853 );
4854
4855 let Undeliverable::Message(envelope) = undeliverable_rx.recv().await.unwrap() else {
4856 panic!("expected returned message");
4857 };
4858 assert_eq!(envelope.dest(), &remote_dest);
4859 }
4860
4861 #[derive(Debug, Default)]
4862 #[export]
4863 struct LookupTestActor;
4864
4865 impl Actor for LookupTestActor {}
4866
4867 #[derive(Handler, HandleClient, Debug)]
4868 enum LookupTestMessage {
4869 ActorExists(ActorRef<TestActor>, #[reply] OncePortRef<bool>),
4870 }
4871
4872 #[async_trait]
4873 #[crate::handle(LookupTestMessage)]
4874 impl LookupTestMessageHandler for LookupTestActor {
4875 async fn actor_exists(
4876 &mut self,
4877 cx: &crate::Context<Self>,
4878 actor_ref: ActorRef<TestActor>,
4879 ) -> Result<bool, anyhow::Error> {
4880 Ok(actor_ref.downcast_handle(cx).is_some())
4881 }
4882 }
4883
4884 #[async_timed_test(timeout_secs = 30)]
4885 async fn test_actor_lookup() {
4886 let proc = Proc::isolated();
4887 let (client, _handle) = proc.client("client").unwrap();
4888
4889 let target_actor = proc.spawn::<TestActor>("target", TestActor).unwrap();
4890 let target_actor_ref = target_actor.bind();
4891 let lookup_actor = proc
4892 .spawn::<LookupTestActor>("lookup", LookupTestActor)
4893 .unwrap();
4894
4895 assert!(
4896 lookup_actor
4897 .actor_exists(&client, target_actor_ref.clone())
4898 .await
4899 .unwrap()
4900 );
4901
4902 assert!(
4904 !lookup_actor
4905 .actor_exists(
4906 &client,
4907 ActorRef::attest(target_actor.actor_addr().anonymous_child())
4908 )
4909 .await
4910 .unwrap()
4911 );
4912 assert!(
4914 !lookup_actor
4915 .actor_exists(&client, ActorRef::attest(lookup_actor.actor_addr().clone()))
4916 .await
4917 .unwrap()
4918 );
4919
4920 target_actor.drain_and_stop("test").unwrap();
4921 target_actor.await;
4922
4923 assert!(
4924 !lookup_actor
4925 .actor_exists(&client, target_actor_ref)
4926 .await
4927 .unwrap()
4928 );
4929
4930 lookup_actor.drain_and_stop("test").unwrap();
4931 lookup_actor.await;
4932 }
4933
4934 fn validate_link(child: &InstanceCell, parent: &InstanceCell) {
4935 assert_eq!(
4936 child.actor_addr().proc_addr(),
4937 parent.actor_addr().proc_addr()
4938 );
4939 assert_eq!(
4940 child.inner.parent.upgrade().unwrap().actor_addr(),
4941 parent.actor_addr()
4942 );
4943 assert_matches!(
4944 parent.inner.children.get(child.uid()),
4945 Some(node) if node.actor_addr() == child.actor_addr()
4946 );
4947 }
4948
4949 #[expect(
4950 clippy::await_holding_invalid_type,
4951 reason = "tracing_test::traced_test macro expansion holds tracing::span::Entered across awaits; can't be fixed in our code"
4952 )]
4953 #[tracing_test::traced_test]
4954 #[async_timed_test(timeout_secs = 30)]
4955 async fn test_spawn_child() {
4956 let proc = Proc::isolated();
4957 let (client, _) = proc.client("client").unwrap();
4958
4959 let first = proc.spawn::<TestActor>("first", TestActor).unwrap();
4960 let second = TestActor::spawn_child(&client, &first).await;
4961 let third = TestActor::spawn_child(&client, &second).await;
4962
4963 assert!(logs_with_scope_contain(
4965 "hyperactor::proc",
4966 format!(
4967 "{}: spawned with {:?}",
4968 first.actor_addr(),
4969 first.cell().actor_task_handle().unwrap()
4970 )
4971 .as_str()
4972 ));
4973 assert!(logs_with_scope_contain(
4974 "hyperactor::proc",
4975 format!(
4976 "{}: spawned with {:?}",
4977 second.actor_addr(),
4978 second.cell().actor_task_handle().unwrap()
4979 )
4980 .as_str()
4981 ));
4982 assert!(logs_with_scope_contain(
4983 "hyperactor::proc",
4984 format!(
4985 "{}: spawned with {:?}",
4986 third.actor_addr(),
4987 third.cell().actor_task_handle().unwrap()
4988 )
4989 .as_str()
4990 ));
4991
4992 assert_eq!(first.actor_addr().proc_addr(), proc.proc_addr());
4994 assert_eq!(second.actor_addr().proc_addr(), proc.proc_addr());
4995 assert_eq!(third.actor_addr().proc_addr(), proc.proc_addr());
4996
4997 validate_link(third.cell(), second.cell());
4999 validate_link(second.cell(), first.cell());
5000 assert!(first.cell().inner.parent.upgrade().is_none());
5001
5002 let third_cell = third.cell().clone();
5005 third.drain_and_stop("test").unwrap();
5006 third.await;
5007 assert!(third_cell.inner.children.is_empty());
5008 drop(third_cell);
5009 validate_link(second.cell(), first.cell());
5010
5011 let second_cell = second.cell().clone();
5012 second.drain_and_stop("test").unwrap();
5013 second.await;
5014 assert!(second_cell.inner.children.is_empty());
5015 drop(second_cell);
5016
5017 let first_cell = first.cell().clone();
5018 first.drain_and_stop("test").unwrap();
5019 first.await;
5020 assert!(first_cell.inner.children.is_empty());
5021 }
5022
5023 #[async_timed_test(timeout_secs = 30)]
5024 async fn test_child_lifecycle() {
5025 let proc = Proc::isolated();
5026 let (client, _) = proc.client("client").unwrap();
5027
5028 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
5029 let root_1 = TestActor::spawn_child(&client, &root).await;
5030 let root_2 = TestActor::spawn_child(&client, &root).await;
5031 let root_2_1 = TestActor::spawn_child(&client, &root_2).await;
5032
5033 root.drain_and_stop("test").unwrap();
5034 root.await;
5035
5036 for actor in [root_1, root_2, root_2_1] {
5037 assert!(
5038 actor
5039 .port::<TestActorMessage>()
5040 .try_send(&client, TestActorMessage::Noop())
5041 .is_err()
5042 );
5043 assert_matches!(actor.await, ActorStatus::Stopped(reason) if reason == "parent stopping");
5044 }
5045 }
5046
5047 #[derive(Debug)]
5048 struct DeferredStopActor {
5049 stop_started: Arc<tokio::sync::Notify>,
5050 release_stop: Arc<tokio::sync::Notify>,
5051 }
5052
5053 #[async_trait]
5054 impl Actor for DeferredStopActor {
5055 async fn handle_stop(
5056 &mut self,
5057 this: &Instance<Self>,
5058 mode: StopMode,
5059 reason: &str,
5060 ) -> Result<(), anyhow::Error> {
5061 let this = this.clone_for_py();
5062 let release_stop = Arc::clone(&self.release_stop);
5063 let reason = reason.to_string();
5064 this.close();
5065 self.stop_started.notify_one();
5066 tokio::spawn(async move {
5067 release_stop.notified().await;
5068 match mode {
5069 StopMode::Stop => this.exit(&reason).unwrap(),
5070 StopMode::DrainAndStop => this.exit_after_drain(&reason).unwrap(),
5071 }
5072 });
5073 Ok(())
5074 }
5075 }
5076
5077 #[async_trait]
5078 impl Handler<()> for DeferredStopActor {
5079 async fn handle(&mut self, _cx: &crate::Context<Self>, _message: ()) -> anyhow::Result<()> {
5080 Ok(())
5081 }
5082 }
5083
5084 #[async_timed_test(timeout_secs = 30)]
5085 async fn test_handle_stop_can_defer_exit() {
5086 let proc = Proc::isolated();
5087 let stop_started = Arc::new(tokio::sync::Notify::new());
5088 let release_stop = Arc::new(tokio::sync::Notify::new());
5089 let handle = proc
5090 .spawn(
5091 "deferred_stop",
5092 DeferredStopActor {
5093 stop_started: Arc::clone(&stop_started),
5094 release_stop: Arc::clone(&release_stop),
5095 },
5096 )
5097 .unwrap();
5098
5099 let mut status = handle.status();
5100 handle.stop("test").unwrap();
5101 stop_started.notified().await;
5102 status
5103 .wait_for(|state| matches!(state, ActorStatus::Stopping))
5104 .await
5105 .unwrap();
5106
5107 release_stop.notify_one();
5108 assert_matches!(handle.await, ActorStatus::Stopped(reason) if reason == "test");
5109 }
5110
5111 #[async_timed_test(timeout_secs = 30)]
5112 async fn test_drain_and_stop_closes_handler_ingress() {
5113 let proc = Proc::isolated();
5114 let (client, _) = proc.client("client").unwrap();
5115 let stop_started = Arc::new(tokio::sync::Notify::new());
5116 let release_stop = Arc::new(tokio::sync::Notify::new());
5117 let handle = proc
5118 .spawn(
5119 "deferred_drain_stop",
5120 DeferredStopActor {
5121 stop_started: Arc::clone(&stop_started),
5122 release_stop: Arc::clone(&release_stop),
5123 },
5124 )
5125 .unwrap();
5126
5127 handle.drain_and_stop("test").unwrap();
5128 stop_started.notified().await;
5129
5130 let err = handle.port::<()>().try_send(&client, ()).unwrap_err();
5133 assert_matches!(err.kind(), crate::mailbox::MailboxSenderErrorKind::Closed);
5134
5135 release_stop.notify_one();
5136 assert_matches!(handle.await, ActorStatus::Stopped(reason) if reason == "test");
5137 }
5138
5139 #[async_timed_test(timeout_secs = 30)]
5140 async fn test_parent_failure() {
5141 let proc = Proc::isolated();
5142 let (client, _) = proc.client("client").unwrap();
5143 let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
5146
5147 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
5148 let root_1 = TestActor::spawn_child(&client, &root).await;
5149 let root_2 = TestActor::spawn_child(&client, &root).await;
5150 let root_2_1 = TestActor::spawn_child(&client, &root_2).await;
5151
5152 root_2.post(
5153 &client,
5154 TestActorMessage::Fail(anyhow::anyhow!("some random failure")),
5155 );
5156 let _root_2_actor_id = root_2.actor_addr().clone();
5157 assert_matches!(
5158 root_2.await,
5159 ActorStatus::Failed(err) if err.to_string() == "some random failure"
5160 );
5161
5162 assert_matches!(
5166 root.await,
5167 ActorStatus::Failed(err) if err.to_string().contains("some random failure")
5168 );
5169 assert_matches!(root_2_1.await, ActorStatus::Stopped(_));
5170 assert_matches!(root_1.await, ActorStatus::Stopped(_));
5171 }
5172
5173 #[async_timed_test(timeout_secs = 30)]
5174 async fn test_multi_handler() {
5175 #[derive(Debug)]
5179 struct TestActor(Arc<AtomicUsize>);
5180
5181 #[async_trait]
5182 impl Actor for TestActor {}
5183
5184 #[async_trait]
5185 impl Handler<OncePortHandle<PortHandle<usize>>> for TestActor {
5186 async fn handle(
5187 &mut self,
5188 cx: &crate::Context<Self>,
5189 message: OncePortHandle<PortHandle<usize>>,
5190 ) -> anyhow::Result<()> {
5191 message.post(cx, cx.port());
5192 Ok(())
5193 }
5194 }
5195
5196 #[async_trait]
5197 impl Handler<usize> for TestActor {
5198 async fn handle(
5199 &mut self,
5200 _cx: &crate::Context<Self>,
5201 message: usize,
5202 ) -> anyhow::Result<()> {
5203 self.0.fetch_add(message, Ordering::SeqCst);
5204 Ok(())
5205 }
5206 }
5207
5208 let proc = Proc::isolated();
5209 let state = Arc::new(AtomicUsize::new(0));
5210 let actor = TestActor(state.clone());
5211 let handle = proc.spawn::<TestActor>("test", actor).unwrap();
5212 let (client, _) = proc.client("client").unwrap();
5213 let (tx, rx) = client.open_once_port();
5214 handle.post(&client, tx);
5215 let usize_handle = rx.recv().await.unwrap();
5216 usize_handle.post(&client, 123);
5217
5218 handle.drain_and_stop("test").unwrap();
5219 handle.await;
5220
5221 assert_eq!(state.load(Ordering::SeqCst), 123);
5222 }
5223
5224 #[async_timed_test(timeout_secs = 30)]
5225 async fn test_post_after_self_message() {
5226 let proc = Proc::isolated();
5227 let (client, _) = proc.client("client").unwrap();
5228 let (ready, ready_rx) = client.open_once_port();
5229 let (fired, fired_rx) = client.open_once_port();
5230 let delay = Duration::from_millis(50);
5231 let start = tokio::time::Instant::now();
5232 let handle = proc
5233 .spawn(
5234 "test",
5235 DelayedSelfActor {
5236 ready: Some(ready.bind()),
5237 fired: Some(fired.bind()),
5238 delay,
5239 },
5240 )
5241 .unwrap();
5242
5243 ready_rx.recv().await.unwrap();
5244 fired_rx.recv().await.unwrap();
5245
5246 assert!(start.elapsed() >= delay);
5247 handle.drain_and_stop("test").unwrap();
5248 handle.await;
5249 }
5250
5251 #[async_timed_test(timeout_secs = 30)]
5252 async fn test_post_after_port_ref() {
5253 let proc = Proc::isolated();
5254 let (client, _) = proc.client("client").unwrap();
5255 let (reply, mut reply_rx) = client.open_port();
5256 let delay = Duration::from_millis(50);
5257 let start = tokio::time::Instant::now();
5258 let handle = proc
5259 .spawn(
5260 "test",
5261 DelayedPortActor {
5262 reply: Some(reply.bind()),
5263 delay,
5264 },
5265 )
5266 .unwrap();
5267
5268 assert_eq!(reply_rx.recv().await.unwrap(), 123);
5269 assert!(start.elapsed() >= delay);
5270 handle.drain_and_stop("test").unwrap();
5271 handle.await;
5272 }
5273
5274 #[async_timed_test(timeout_secs = 30)]
5275 async fn test_post_after_discards_pending_messages_on_shutdown() {
5276 let proc = Proc::isolated();
5277 let (client, _) = proc.client("client").unwrap();
5278 let (ready, ready_rx) = client.open_once_port();
5279 let (fired, fired_rx) = client.open_once_port();
5280 let handle = proc
5281 .spawn(
5282 "test",
5283 DelayedSelfActor {
5284 ready: Some(ready.bind()),
5285 fired: Some(fired.bind()),
5286 delay: Duration::from_secs(60),
5287 },
5288 )
5289 .unwrap();
5290
5291 ready_rx.recv().await.unwrap();
5292 handle.drain_and_stop("test").unwrap();
5293 assert_matches!(handle.await, ActorStatus::Stopped(reason) if reason == "test");
5294
5295 let result = tokio::time::timeout(Duration::from_millis(100), fired_rx.recv()).await;
5296 assert!(!matches!(result, Ok(Ok(()))));
5297 }
5298
5299 #[async_timed_test(timeout_secs = 30)]
5300 async fn test_actor_panic() {
5301 panic_handler::set_panic_hook();
5303
5304 let proc = Proc::isolated();
5305 let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
5308
5309 let (client, _handle) = proc.client("client").unwrap();
5310 let actor_handle = proc.spawn("test", TestActor).unwrap();
5311 actor_handle
5312 .panic(&client, "some random failure".to_string())
5313 .await
5314 .unwrap();
5315 let actor_status = actor_handle.await;
5316
5317 assert_matches!(actor_status, ActorStatus::Failed(_));
5321 if let ActorStatus::Failed(err) = actor_status {
5322 let error_msg = err.to_string();
5323 assert!(error_msg.contains("some random failure"));
5325 assert!(error_msg.contains("library/std/src/panicking.rs"));
5329 }
5330 }
5331
5332 #[cfg_attr(not(target_os = "linux"), ignore = "linux-only")]
5333 #[async_timed_test(timeout_secs = 30)]
5334 async fn test_local_supervision_propagation() {
5335 hyperactor_telemetry::initialize_logging_for_test();
5336
5337 #[derive(Debug)]
5338 struct TestActor {
5339 handled: Arc<AtomicBool>,
5340 notify: Arc<tokio::sync::Notify>,
5341 should_handle: bool,
5342 }
5343
5344 #[async_trait]
5345 impl Actor for TestActor {
5346 async fn handle_supervision_event(
5347 &mut self,
5348 _this: &Instance<Self>,
5349 _event: &ActorSupervisionEvent,
5350 ) -> Result<bool, anyhow::Error> {
5351 if !self.should_handle {
5352 return Ok(false);
5353 }
5354
5355 tracing::error!(
5356 "{}: supervision event received: {:?}",
5357 _this.self_addr(),
5358 _event
5359 );
5360 self.handled.store(true, Ordering::SeqCst);
5361 self.notify.notify_one();
5362 Ok(true)
5363 }
5364 }
5365
5366 #[async_trait]
5367 impl Handler<String> for TestActor {
5368 async fn handle(
5369 &mut self,
5370 cx: &crate::Context<Self>,
5371 message: String,
5372 ) -> anyhow::Result<()> {
5373 tracing::info!("{} received message: {}", cx.self_addr(), message);
5374 Err(anyhow::anyhow!(message))
5375 }
5376 }
5377
5378 let make_actor = |handled: &Arc<AtomicBool>, should_handle: bool| TestActor {
5379 handled: handled.clone(),
5380 notify: Arc::new(tokio::sync::Notify::new()),
5381 should_handle,
5382 };
5383
5384 let proc = Proc::isolated();
5385 let (client, _) = proc.client("client").unwrap();
5386 let (mut reported_event, _coordinator) =
5387 ProcSupervisionCoordinator::set(&proc).await.unwrap();
5388
5389 let root_state = Arc::new(AtomicBool::new(false));
5390 let root_1_state = Arc::new(AtomicBool::new(false));
5391 let root_1_notify = Arc::new(tokio::sync::Notify::new());
5392 let root_1_1_state = Arc::new(AtomicBool::new(false));
5393 let root_1_1_1_state = Arc::new(AtomicBool::new(false));
5394 let root_2_state = Arc::new(AtomicBool::new(false));
5395 let root_2_1_state = Arc::new(AtomicBool::new(false));
5396
5397 let root = proc
5398 .spawn::<TestActor>("root", make_actor(&root_state, false))
5399 .unwrap();
5400 let root_1 = proc
5401 .spawn_child::<TestActor>(
5402 root.cell().clone(),
5403 TestActor {
5404 handled: root_1_state.clone(),
5405 notify: root_1_notify.clone(),
5406 should_handle: true, },
5408 )
5409 .unwrap();
5410 let root_1_1 = proc
5411 .spawn_child::<TestActor>(root_1.cell().clone(), make_actor(&root_1_1_state, false))
5412 .unwrap();
5413 let root_1_1_1 = proc
5414 .spawn_child::<TestActor>(
5415 root_1_1.cell().clone(),
5416 make_actor(&root_1_1_1_state, false),
5417 )
5418 .unwrap();
5419 let root_2 = proc
5420 .spawn_child::<TestActor>(root.cell().clone(), make_actor(&root_2_state, false))
5421 .unwrap();
5422 let root_2_1 = proc
5423 .spawn_child::<TestActor>(root_2.cell().clone(), make_actor(&root_2_1_state, false))
5424 .unwrap();
5425
5426 root_1_1_1.post(&client, "some random failure".to_string());
5429
5430 let root_2_1_id = root_2_1.actor_addr().clone();
5433 root_2_1.post(&client, "some random failure".to_string());
5434
5435 root_1_notify.notified().await;
5439
5440 let event = reported_event.recv().await;
5443 assert_eq!(event.actor_id, root_2_1_id);
5444
5445 assert!(!root_state.load(Ordering::SeqCst));
5446 assert!(root_1_state.load(Ordering::SeqCst));
5447 assert!(!root_1_1_state.load(Ordering::SeqCst));
5448 assert!(!root_1_1_1_state.load(Ordering::SeqCst));
5449 assert!(!root_2_state.load(Ordering::SeqCst));
5450 assert!(!root_2_1_state.load(Ordering::SeqCst));
5451 }
5452
5453 #[async_timed_test(timeout_secs = 30)]
5454 async fn test_instance() {
5455 #[derive(Debug, Default)]
5456 struct TestActor;
5457
5458 impl Actor for TestActor {}
5459
5460 #[async_trait]
5461 impl Handler<(String, PortRef<String>)> for TestActor {
5462 async fn handle(
5463 &mut self,
5464 cx: &crate::Context<Self>,
5465 (message, port): (String, PortRef<String>),
5466 ) -> anyhow::Result<()> {
5467 port.post(cx, message);
5468 Ok(())
5469 }
5470 }
5471
5472 let proc = Proc::isolated();
5473
5474 let (instance, handle) = proc.client("my_test_actor").unwrap();
5475
5476 let child_actor = TestActor.spawn(&instance).unwrap();
5477
5478 let (port, mut receiver) = instance.open_port();
5479 child_actor.post(&instance, ("hello".to_string(), port.bind()));
5480
5481 let message = receiver.recv().await.unwrap();
5482 assert_eq!(message, "hello");
5483
5484 child_actor.drain_and_stop("test").unwrap();
5485 child_actor.await;
5486
5487 assert_eq!(*handle.status().borrow(), ActorStatus::Client);
5488 drop(instance);
5489 assert_matches!(*handle.status().borrow(), ActorStatus::Stopped(_));
5490 handle.await;
5491 }
5492
5493 #[cfg_attr(target_os = "macos", ignore = "tokio runtime fork assertion on macOS")]
5496 #[tokio::test]
5497 async fn test_proc_terminate_without_coordinator() {
5498 if std::env::var("CARGO_TEST").is_ok() {
5499 eprintln!("test skipped as it hangs when run by cargo in sandcastle");
5500 return;
5501 }
5502
5503 let process = async {
5504 let proc = Proc::isolated();
5505 let root = proc.spawn("root", TestActor).unwrap();
5509 let (client, _handle) = proc.client("client").unwrap();
5510 root.fail(&client, anyhow::anyhow!("some random failure"))
5511 .await
5512 .unwrap();
5513 tokio::time::sleep(Duration::from_secs(30)).await;
5517 };
5518
5519 assert_termination(|| process, 1).await.unwrap();
5520 }
5521
5522 fn trace_and_block(fut: impl Future) {
5523 tracing::subscriber::with_default(
5524 tracing_subscriber::Registry::default().with(hyperactor_telemetry::recorder().layer()),
5525 || {
5526 tokio::runtime::Builder::new_current_thread()
5527 .enable_all()
5528 .build()
5529 .unwrap()
5530 .block_on(fut)
5531 },
5532 );
5533 }
5534
5535 #[ignore = "until trace recording is turned back on"]
5536 #[test]
5537 fn test_handler_logging() {
5538 #[derive(Debug, Default)]
5539 struct LoggingActor;
5540
5541 impl Actor for LoggingActor {}
5542
5543 impl LoggingActor {
5544 async fn wait(cx: &impl context::Actor, handle: &ActorHandle<Self>) {
5545 let barrier = Arc::new(Barrier::new(2));
5546 handle.post(cx, barrier.clone());
5547 barrier.wait().await;
5548 }
5549 }
5550
5551 #[async_trait]
5552 impl Handler<String> for LoggingActor {
5553 async fn handle(
5554 &mut self,
5555 _cx: &crate::Context<Self>,
5556 message: String,
5557 ) -> anyhow::Result<()> {
5558 tracing::info!("{}", message);
5559 Ok(())
5560 }
5561 }
5562
5563 #[async_trait]
5564 impl Handler<u64> for LoggingActor {
5565 async fn handle(
5566 &mut self,
5567 _cx: &crate::Context<Self>,
5568 message: u64,
5569 ) -> anyhow::Result<()> {
5570 tracing::event!(Level::INFO, number = message);
5571 Ok(())
5572 }
5573 }
5574
5575 #[async_trait]
5576 impl Handler<Arc<Barrier>> for LoggingActor {
5577 async fn handle(
5578 &mut self,
5579 _cx: &crate::Context<Self>,
5580 message: Arc<Barrier>,
5581 ) -> anyhow::Result<()> {
5582 message.wait().await;
5583 Ok(())
5584 }
5585 }
5586
5587 #[async_trait]
5588 impl Handler<Arc<(Barrier, Barrier)>> for LoggingActor {
5589 #[expect(
5590 clippy::await_holding_invalid_type,
5591 reason = "tracing_test::traced_test macro expansion holds tracing::span::Entered across awaits; can't be fixed in our code"
5592 )]
5593 async fn handle(
5594 &mut self,
5595 _cx: &crate::Context<Self>,
5596 barriers: Arc<(Barrier, Barrier)>,
5597 ) -> anyhow::Result<()> {
5598 let inner = tracing::span!(Level::INFO, "child_span");
5599 let _inner_guard = inner.enter();
5600 barriers.0.wait().await;
5601 barriers.1.wait().await;
5602 Ok(())
5603 }
5604 }
5605
5606 trace_and_block(async {
5607 let proc = Proc::isolated();
5608 let (client, _) = proc.client("client").unwrap();
5609 let handle = LoggingActor.spawn_detached().unwrap();
5610 handle.post(&client, "hello world".to_string());
5611 handle.post(&client, "hello world again".to_string());
5612 handle.post(&client, 123u64);
5613
5614 LoggingActor::wait(&client, &handle).await;
5615
5616 let events = handle.cell().inner.recording.tail();
5617 assert_eq!(events.len(), 3);
5618 assert_eq!(events[0].json_value(), json!({ "message": "hello world" }));
5619 assert_eq!(
5620 events[1].json_value(),
5621 json!({ "message": "hello world again" })
5622 );
5623 assert_eq!(events[2].json_value(), json!({ "number": 123 }));
5624
5625 let stacks = {
5626 let barriers = Arc::new((Barrier::new(2), Barrier::new(2)));
5627 handle.post(&client, Arc::clone(&barriers));
5628 barriers.0.wait().await;
5629 let stacks = handle.cell().inner.recording.stacks();
5630 barriers.1.wait().await;
5631 stacks
5632 };
5633 assert_eq!(stacks.len(), 1);
5634 assert_eq!(stacks[0].len(), 1);
5635 assert_eq!(stacks[0][0].name(), "child_span");
5636 })
5637 }
5638
5639 #[async_timed_test(timeout_secs = 30)]
5640 async fn test_mailbox_closed_with_owner_stopped_reason() {
5641 let proc = Proc::isolated();
5642 let (client, _) = proc.client("client").unwrap();
5643 let actor_handle = proc.spawn("test", TestActor).unwrap();
5644
5645 let handle_for_send = actor_handle.clone();
5647
5648 actor_handle.drain_and_stop("healthy shutdown").unwrap();
5650 actor_handle.await;
5651
5652 let result = handle_for_send
5654 .port::<TestActorMessage>()
5655 .try_send(&client, TestActorMessage::Noop());
5656
5657 assert!(result.is_err(), "send should fail when actor is stopped");
5658 let err = result.unwrap_err();
5659 assert_matches!(
5660 err.kind(),
5661 crate::mailbox::MailboxSenderErrorKind::Mailbox(mailbox_err)
5662 if matches!(
5663 mailbox_err.kind(),
5664 crate::mailbox::MailboxErrorKind::OwnerTerminated(ActorStatus::Stopped(reason)) if reason == "healthy shutdown"
5665 )
5666 );
5667 }
5668
5669 #[async_timed_test(timeout_secs = 30)]
5670 async fn test_mailbox_closed_with_owner_failed_reason() {
5671 let proc = Proc::isolated();
5672 let (client, _) = proc.client("client").unwrap();
5673 let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
5676
5677 let actor_handle = proc.spawn("test", TestActor).unwrap();
5678
5679 let handle_for_send = actor_handle.clone();
5681
5682 actor_handle.post(
5684 &client,
5685 TestActorMessage::Fail(anyhow::anyhow!("intentional failure")),
5686 );
5687 actor_handle.await;
5688
5689 let result = handle_for_send
5691 .port::<TestActorMessage>()
5692 .try_send(&client, TestActorMessage::Noop());
5693
5694 assert!(result.is_err(), "send should fail when actor has failed");
5695 let err = result.unwrap_err();
5696 assert_matches!(
5697 err.kind(),
5698 crate::mailbox::MailboxSenderErrorKind::Mailbox(mailbox_err)
5699 if matches!(
5700 mailbox_err.kind(),
5701 crate::mailbox::MailboxErrorKind::OwnerTerminated(ActorStatus::Failed(ActorErrorKind::Generic(msg)))
5702 if msg.contains("intentional failure")
5703 )
5704 );
5705 }
5706
5707 async fn wait_for_terminated_snapshot(
5711 proc: &Proc,
5712 actor_id: &ActorAddr,
5713 ) -> crate::introspect::IntrospectResult {
5714 for i in 0..1000 {
5718 if let Some(snapshot) = proc.terminated_snapshot(actor_id) {
5719 return snapshot;
5720 }
5721 if i < 50 {
5722 tokio::task::yield_now().await;
5723 } else {
5724 tokio::time::sleep(Duration::from_millis(50)).await;
5725 }
5726 }
5727 panic!("timed out waiting for terminated snapshot for {}", actor_id);
5728 }
5729
5730 #[async_timed_test(timeout_secs = 60)]
5739 async fn test_terminated_snapshot_stored_on_stop() {
5740 let proc = Proc::isolated();
5741 let (_client, _client_handle) = proc.client("client").unwrap();
5742
5743 let handle = proc.spawn::<TestActor>("actor", TestActor).unwrap();
5744 let actor_id = handle.actor_addr().clone();
5745
5746 assert!(proc.terminated_snapshot(&actor_id).is_none());
5748 assert!(!proc.all_terminated_actor_ids().contains(&actor_id));
5749
5750 handle.drain_and_stop("test").unwrap();
5752 handle.await;
5753
5754 let snapshot = wait_for_terminated_snapshot(&proc, &actor_id).await;
5757 let attrs: hyperactor_config::Attrs =
5758 serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
5759 let status = attrs
5760 .get(crate::introspect::STATUS)
5761 .expect("must have status");
5762 assert!(
5763 status.starts_with("stopped"),
5764 "expected stopped status, got: {}",
5765 status
5766 );
5767
5768 assert!(proc.all_terminated_actor_ids().contains(&actor_id));
5770 assert!(
5771 !proc.all_actor_ids().contains(&actor_id),
5772 "stopped actor should not appear in live actor IDs"
5773 );
5774 }
5775
5776 #[async_timed_test(timeout_secs = 60)]
5783 async fn test_terminated_snapshot_stored_on_failure() {
5784 let proc = Proc::isolated();
5785 let (client, _client_handle) = proc.client("client").unwrap();
5786 ProcSupervisionCoordinator::set(&proc).await.unwrap();
5788
5789 let handle = proc.spawn::<TestActor>("fail_actor", TestActor).unwrap();
5790 let actor_id = handle.actor_addr().clone();
5791
5792 handle.post(&client, TestActorMessage::Fail(anyhow::anyhow!("boom")));
5794 handle.await;
5795
5796 let snapshot = wait_for_terminated_snapshot(&proc, &actor_id).await;
5797 let attrs: hyperactor_config::Attrs =
5798 serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
5799 let status = attrs
5800 .get(crate::introspect::STATUS)
5801 .expect("must have status");
5802 assert!(
5803 status.starts_with("failed"),
5804 "expected failed status, got: {}",
5805 status
5806 );
5807 }
5808
5809 #[async_timed_test(timeout_secs = 30)]
5811 async fn test_supervision_event_stored_on_failure() {
5812 let proc = Proc::isolated();
5813 let (client, _client_handle) = proc.client("client").unwrap();
5814 ProcSupervisionCoordinator::set(&proc).await.unwrap();
5815
5816 let handle = proc.spawn::<TestActor>("fail_actor", TestActor).unwrap();
5817 let actor_id = handle.actor_addr().clone();
5818 let cell = handle.cell().clone();
5819
5820 handle.post(&client, TestActorMessage::Fail(anyhow::anyhow!("boom")));
5821 handle.await;
5822
5823 let event = cell.supervision_event();
5824 assert!(event.is_some(), "failed actor must have supervision_event");
5825 let event = event.unwrap();
5826 assert_eq!(event.actor_id, actor_id);
5827 assert!(event.actor_status.is_failed());
5828 assert_eq!(event.actually_failing_actor().unwrap().actor_id, actor_id);
5830 }
5831
5832 #[async_timed_test(timeout_secs = 30)]
5834 async fn test_supervision_event_on_clean_stop() {
5835 let proc = Proc::isolated();
5836 let (_client, _client_handle) = proc.client("client").unwrap();
5837
5838 let handle = proc.spawn::<TestActor>("stop_actor", TestActor).unwrap();
5839 let cell = handle.cell().clone();
5840
5841 handle.drain_and_stop("test").unwrap();
5842 handle.await;
5843
5844 let event = cell
5845 .supervision_event()
5846 .expect("cleanly stopped actor must have a supervision_event");
5847 assert!(
5848 matches!(event.actor_status, ActorStatus::Stopped(_)),
5849 "expected Stopped status, got {:?}",
5850 event.actor_status
5851 );
5852 assert!(!event.is_error());
5853 }
5854
5855 #[async_timed_test(timeout_secs = 30)]
5856 async fn test_supervision_coordinator_receives_clean_stop() {
5857 let proc = Proc::isolated();
5858 let (_client, _client_handle) = proc.client("client").unwrap();
5859 let (mut reported_event, _coordinator_handle) =
5860 ProcSupervisionCoordinator::set(&proc).await.unwrap();
5861
5862 let handle = proc.spawn::<TestActor>("stop_actor", TestActor).unwrap();
5863 let actor_id = handle.actor_addr().clone();
5864
5865 handle.drain_and_stop("test").unwrap();
5866 handle.await;
5867
5868 let event = reported_event.recv().await;
5869 assert_eq!(event.actor_id, actor_id);
5870 assert!(
5871 matches!(event.actor_status, ActorStatus::Stopped(_)),
5872 "expected Stopped status, got {:?}",
5873 event.actor_status
5874 );
5875 assert!(!event.is_error());
5876 }
5877
5878 #[async_timed_test(timeout_secs = 30)]
5879 async fn test_coordinator_shuts_down_last_during_destroy() {
5880 let mut proc = Proc::isolated();
5881 let (_client, _client_handle) = proc.client("client").unwrap();
5882 let (mut reported_event, _coordinator_handle) =
5883 ProcSupervisionCoordinator::set(&proc).await.unwrap();
5884
5885 let mut actor_ids = Vec::new();
5887 for i in 0..3 {
5888 let handle = proc
5889 .spawn::<TestActor>(&format!("actor_{i}"), TestActor)
5890 .unwrap();
5891 actor_ids.push(handle.actor_addr().clone());
5892 }
5893
5894 proc.destroy_and_wait(Duration::from_secs(5), "test")
5899 .await
5900 .unwrap();
5901
5902 let mut received_ids = Vec::new();
5904 for _ in 0..actor_ids.len() {
5905 let event = reported_event.recv().await;
5906 assert!(
5907 matches!(event.actor_status, ActorStatus::Stopped(_)),
5908 "expected Stopped, got {:?}",
5909 event.actor_status
5910 );
5911 received_ids.push(event.actor_id);
5912 }
5913 received_ids.sort();
5914 actor_ids.sort();
5915 assert_eq!(received_ids, actor_ids);
5916 }
5917
5918 #[async_timed_test(timeout_secs = 30)]
5920 async fn test_supervision_event_on_propagated_failure() {
5921 let proc = Proc::isolated();
5922 let (client, _client_handle) = proc.client("client").unwrap();
5923 ProcSupervisionCoordinator::set(&proc).await.unwrap();
5924
5925 let parent = proc.spawn::<TestActor>("parent", TestActor).unwrap();
5926 let parent_cell = parent.cell().clone();
5927 let (tx, rx) = oneshot::channel();
5929 parent.post(&client, TestActorMessage::Spawn(tx));
5930 let child = rx.await.unwrap();
5931 let child_id = child.actor_addr().clone();
5932
5933 child.post(
5936 &client,
5937 TestActorMessage::Fail(anyhow::anyhow!("child boom")),
5938 );
5939 parent.await;
5940
5941 let event = parent_cell.supervision_event();
5942 assert!(
5943 event.is_some(),
5944 "parent must have supervision_event from propagated failure"
5945 );
5946 let event = event.unwrap();
5947 assert_eq!(event.actually_failing_actor().unwrap().actor_id, child_id);
5949 }
5950
5951 #[async_timed_test(timeout_secs = 30)]
5960 async fn test_resolve_actor_ref_none_for_terminal_actor() {
5961 let proc = Proc::isolated();
5962 let (_client, _client_handle) = proc.client("client").unwrap();
5963
5964 let handle = proc.spawn::<TestActor>("target", TestActor).unwrap();
5965 let actor_ref: ActorRef<TestActor> = handle.bind();
5966
5967 assert!(
5969 proc.resolve_actor_ref(&actor_ref).is_some(),
5970 "live actor should be resolvable"
5971 );
5972
5973 handle.drain_and_stop("test").unwrap();
5974 handle.await;
5975
5976 assert!(
5979 proc.resolve_actor_ref(&actor_ref).is_none(),
5980 "terminal actor must not be resolvable"
5981 );
5982 }
5983
5984 #[async_timed_test(timeout_secs = 60)]
5986 async fn test_terminated_snapshot_has_failure_info() {
5987 let proc = Proc::isolated();
5988 let (client, _client_handle) = proc.client("client").unwrap();
5989 ProcSupervisionCoordinator::set(&proc).await.unwrap();
5990
5991 let handle = proc.spawn::<TestActor>("fail_actor", TestActor).unwrap();
5992 let actor_id = handle.actor_addr().clone();
5993
5994 handle.post(&client, TestActorMessage::Fail(anyhow::anyhow!("kaboom")));
5995 handle.await;
5996
5997 let snapshot = wait_for_terminated_snapshot(&proc, &actor_id).await;
5998 let attrs: hyperactor_config::Attrs =
5999 serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
6000 let status = attrs
6001 .get(crate::introspect::STATUS)
6002 .expect("must have status");
6003 assert!(
6004 status.starts_with("failed"),
6005 "expected failed status, got: {}",
6006 status
6007 );
6008 let err_msg = attrs
6009 .get(crate::introspect::FAILURE_ERROR_MESSAGE)
6010 .expect("failed actor must have failure_error_message");
6011 assert!(!err_msg.is_empty());
6012 let root_cause = attrs
6013 .get(crate::introspect::FAILURE_ROOT_CAUSE_ACTOR)
6014 .expect("must have root_cause_actor");
6015 assert_eq!(root_cause, &actor_id);
6016 assert_eq!(
6017 attrs.get(crate::introspect::FAILURE_IS_PROPAGATED),
6018 Some(&false)
6019 );
6020 assert!(
6021 attrs.get(crate::introspect::FAILURE_OCCURRED_AT).is_some(),
6022 "failed actor must have occurred_at"
6023 );
6024 }
6025
6026 #[async_timed_test(timeout_secs = 60)]
6028 async fn test_propagated_failure_info() {
6029 let proc = Proc::isolated();
6030 let (client, _client_handle) = proc.client("client").unwrap();
6031 ProcSupervisionCoordinator::set(&proc).await.unwrap();
6032
6033 let parent = proc.spawn::<TestActor>("parent", TestActor).unwrap();
6034 let parent_id = parent.actor_addr().clone();
6035
6036 let (tx, rx) = oneshot::channel();
6037 parent.post(&client, TestActorMessage::Spawn(tx));
6038 let child = rx.await.unwrap();
6039 let child_id = child.actor_addr().clone();
6040
6041 child.post(
6042 &client,
6043 TestActorMessage::Fail(anyhow::anyhow!("child fail")),
6044 );
6045 parent.await;
6046
6047 let snapshot = wait_for_terminated_snapshot(&proc, &parent_id).await;
6048 let attrs: hyperactor_config::Attrs =
6049 serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
6050 let root_cause = attrs
6051 .get(crate::introspect::FAILURE_ROOT_CAUSE_ACTOR)
6052 .expect("propagated failure must have root_cause_actor");
6053 assert_eq!(root_cause, &child_id);
6054 assert_eq!(
6055 attrs.get(crate::introspect::FAILURE_IS_PROPAGATED),
6056 Some(&true)
6057 );
6058 }
6059
6060 #[async_timed_test(timeout_secs = 30)]
6062 async fn test_spawn_with_name_creates_descriptive_name() {
6063 let proc = Proc::isolated();
6064 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
6065 let handle = proc
6066 .spawn_named_child(root.cell().clone(), "my_controller", TestActor)
6067 .unwrap();
6068 assert_eq!(
6069 handle.actor_addr().label().unwrap().as_str(),
6070 "my_controller"
6071 );
6072 assert!(!handle.actor_addr().is_root());
6073 }
6074
6075 #[async_timed_test(timeout_secs = 30)]
6077 async fn test_spawn_with_name_increments_index() {
6078 let proc = Proc::isolated();
6079 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
6080 let first = proc
6081 .spawn_named_child(root.cell().clone(), "my_controller", TestActor)
6082 .unwrap();
6083 let second = proc
6084 .spawn_named_child(root.cell().clone(), "my_controller", TestActor)
6085 .unwrap();
6086 assert_ne!(first.actor_addr().uid(), second.actor_addr().uid());
6087 }
6088
6089 #[async_timed_test(timeout_secs = 30)]
6092 async fn test_spawn_with_name_preserves_supervision() {
6093 let proc = Proc::isolated();
6094 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
6095 let child = proc
6096 .spawn_named_child(root.cell().clone(), "supervised_child", TestActor)
6097 .unwrap();
6098 let child_cell = child.cell();
6099 let parent = child_cell.parent().expect("named child must have a parent");
6100 assert_eq!(parent.actor_addr(), root.actor_addr());
6101 }
6102
6103 #[async_timed_test(timeout_secs = 30)]
6105 async fn test_spawn_unchanged() {
6106 let proc = Proc::isolated();
6107 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
6108 let child = proc.spawn_child(root.cell().clone(), TestActor).unwrap();
6109 assert!(!child.actor_addr().is_root());
6110 }
6111
6112 #[async_timed_test(timeout_secs = 30)]
6114 async fn test_spawn_with_name_different_names_different_pids() {
6115 let proc = Proc::isolated();
6116 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
6117 let a = proc
6118 .spawn_named_child(root.cell().clone(), "controller_a", TestActor)
6119 .unwrap();
6120 let b = proc
6121 .spawn_named_child(root.cell().clone(), "controller_b", TestActor)
6122 .unwrap();
6123 assert_ne!(a.actor_addr().uid(), b.actor_addr().uid());
6124 assert_eq!(a.actor_addr().label().unwrap().as_str(), "controller_a");
6125 assert_eq!(b.actor_addr().label().unwrap().as_str(), "controller_b");
6126 }
6127
6128 #[async_timed_test(timeout_secs = 30)]
6130 async fn test_spawn_with_name_no_child_overwrite() {
6131 let proc = Proc::isolated();
6132 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
6133 let _a = proc
6134 .spawn_named_child(root.cell().clone(), "ctrl", TestActor)
6135 .unwrap();
6136 let _b = proc
6137 .spawn_named_child(root.cell().clone(), "ctrl", TestActor)
6138 .unwrap();
6139 let _c = proc.spawn_child(root.cell().clone(), TestActor).unwrap();
6140 assert_eq!(root.cell().child_count(), 3);
6141 }
6142
6143 #[async_timed_test(timeout_secs = 30)]
6145 async fn test_spawn_with_name_does_not_pollute_roots() {
6146 let proc = Proc::isolated();
6147 let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
6148 let _child = proc
6149 .spawn_named_child(root.cell().clone(), "foo", TestActor)
6150 .unwrap();
6151 let result = proc.spawn::<TestActor>("foo", TestActor);
6154 assert!(result.is_ok(), "named child should not pollute roots");
6155 }
6156
6157 #[async_timed_test(timeout_secs = 30)]
6159 async fn test_ai3_controller_actor_ids_unique_across_parents_same_proc() {
6160 let proc = Proc::isolated();
6161 let parent_a = proc.spawn::<TestActor>("parent_a", TestActor).unwrap();
6162 let parent_b = proc.spawn::<TestActor>("parent_b", TestActor).unwrap();
6163
6164 let ctrl_a = proc
6166 .spawn_named_child(parent_a.cell().clone(), "controller_mesh_a", TestActor)
6167 .unwrap();
6168 let ctrl_b = proc
6169 .spawn_named_child(parent_b.cell().clone(), "controller_mesh_b", TestActor)
6170 .unwrap();
6171
6172 assert_ne!(
6173 ctrl_a.actor_addr(),
6174 ctrl_b.actor_addr(),
6175 "controller ActorAddrs must be unique across parents"
6176 );
6177 }
6178
6179 #[async_timed_test(timeout_secs = 30)]
6181 async fn test_ai3_no_controller_overwrite_in_parent_or_proc_maps() {
6182 let proc = Proc::isolated();
6183 let parent_a = proc.spawn::<TestActor>("parent_a", TestActor).unwrap();
6184 let parent_b = proc.spawn::<TestActor>("parent_b", TestActor).unwrap();
6185
6186 let ctrl_a = proc
6187 .spawn_named_child(parent_a.cell().clone(), "controller_mesh_a", TestActor)
6188 .unwrap();
6189 let ctrl_b = proc
6190 .spawn_named_child(parent_b.cell().clone(), "controller_mesh_b", TestActor)
6191 .unwrap();
6192
6193 assert!(
6195 proc.get_instance(ctrl_a.actor_addr()).is_some(),
6196 "ctrl_a must be resolvable"
6197 );
6198 assert!(
6199 proc.get_instance(ctrl_b.actor_addr()).is_some(),
6200 "ctrl_b must be resolvable"
6201 );
6202 assert_eq!(parent_a.cell().child_count(), 1);
6204 assert_eq!(parent_b.cell().child_count(), 1);
6205 }
6206
6207 #[async_timed_test(timeout_secs = 60)]
6209 async fn test_stopped_snapshot_has_no_failure_info() {
6210 let proc = Proc::isolated();
6211 let (_client, _client_handle) = proc.client("client").unwrap();
6212
6213 let handle = proc.spawn::<TestActor>("stop_actor", TestActor).unwrap();
6214 let actor_id = handle.actor_addr().clone();
6215
6216 handle.drain_and_stop("test").unwrap();
6217 handle.await;
6218
6219 let snapshot = wait_for_terminated_snapshot(&proc, &actor_id).await;
6220 let attrs: hyperactor_config::Attrs =
6221 serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
6222 let status = attrs
6223 .get(crate::introspect::STATUS)
6224 .expect("must have status");
6225 assert!(
6226 status.starts_with("stopped"),
6227 "expected stopped, got: {}",
6228 status
6229 );
6230 assert!(
6231 attrs
6232 .get(crate::introspect::FAILURE_ERROR_MESSAGE)
6233 .is_none(),
6234 "stopped actor must not have failure attrs"
6235 );
6236 }
6237
6238 #[async_timed_test(timeout_secs = 10)]
6245 async fn test_queue_depth_increment_decrement() {
6246 let proc = Proc::isolated();
6247 let (client, _) = proc.client("client").unwrap();
6248 let handle = proc.spawn("qd_test", TestActor).unwrap();
6249 let actor_ref: crate::ActorRef<TestActor> = handle.bind();
6250 let actor_id = actor_ref.actor_addr().clone();
6251
6252 let cell = proc.get_instance(&actor_id).expect("actor exists");
6254 assert_eq!(cell.queue_depth(), 0, "initial queue depth should be 0");
6255
6256 let (reply_tx, reply_rx) = oneshot::channel();
6259 let (gate_tx, gate_rx) = oneshot::channel::<()>();
6260 handle.wait(&client, reply_tx, gate_rx).await.unwrap();
6261
6262 reply_rx.await.unwrap();
6264
6265 let (reply2_tx, reply2_rx) = oneshot::channel();
6267 handle.reply(&client, reply2_tx).await.unwrap();
6268
6269 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
6271
6272 let depth = cell.queue_depth();
6274 assert!(
6275 depth >= 1,
6276 "expected queue depth >= 1 while actor is busy, got {depth}"
6277 );
6278
6279 let _ = gate_tx.send(());
6281
6282 reply2_rx.await.unwrap();
6284
6285 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
6287
6288 let depth = cell.queue_depth();
6290 assert_eq!(
6291 depth, 0,
6292 "queue depth should return to 0 after all messages handled"
6293 );
6294 }
6295
6296 #[async_timed_test(timeout_secs = 10)]
6300 async fn test_proc_queue_depth_aggregation_under_pressure() {
6301 let proc = Proc::isolated();
6302 let (client, _) = proc.client("client").unwrap();
6303
6304 let h1 = proc.spawn("a1", TestActor).unwrap();
6306 let h2 = proc.spawn("a2", TestActor).unwrap();
6307
6308 let (reply1, rx1) = oneshot::channel();
6310 let (gate1, grx1) = oneshot::channel::<()>();
6311 h1.wait(&client, reply1, grx1).await.unwrap();
6312 rx1.await.unwrap();
6313
6314 let (reply2, rx2) = oneshot::channel();
6315 let (gate2, grx2) = oneshot::channel::<()>();
6316 h2.wait(&client, reply2, grx2).await.unwrap();
6317 rx2.await.unwrap();
6318
6319 h1.noop(&client).await.unwrap();
6321 h1.noop(&client).await.unwrap();
6322 h2.noop(&client).await.unwrap();
6323
6324 let aggregate = || -> (u64, u64) {
6327 let mut total: u64 = 0;
6328 let mut max: u64 = 0;
6329 for actor_id in proc.all_instance_keys() {
6330 if let Some(cell) = proc.get_instance_by_id(&actor_id) {
6331 let depth = cell.queue_depth();
6332 total = total.saturating_add(depth);
6333 max = max.max(depth);
6334 }
6335 }
6336 (total, max)
6337 };
6338
6339 let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
6342 loop {
6343 let (total, max) = aggregate();
6344 if total >= 3 {
6345 assert!(max >= 1, "expected max >= 1, got {max}");
6346 assert!(max <= total, "PD-1: max ({max}) <= total ({total})");
6347 break;
6348 }
6349 assert!(
6350 tokio::time::Instant::now() < deadline,
6351 "timed out waiting for queue depth >= 3, got {total}",
6352 );
6353 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
6354 }
6355
6356 let _ = gate1.send(());
6358 let _ = gate2.send(());
6359
6360 let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
6362 loop {
6363 let (total, _) = aggregate();
6364 if total == 0 {
6365 break;
6366 }
6367 assert!(
6368 tokio::time::Instant::now() < deadline,
6369 "timed out waiting for queue depth to return to 0, got {total}",
6370 );
6371 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
6372 }
6373 }
6374
6375 #[async_timed_test(timeout_secs = 5)]
6380 async fn test_retained_queue_stats_cold_start() {
6381 let proc = Proc::isolated();
6382 assert_eq!(proc.queue_depth_total(), 0);
6383 assert_eq!(proc.queue_depth_high_water_mark(), 0);
6384 assert_eq!(proc.last_nonzero_queue_depth_age_ms(), None);
6385 }
6386
6387 #[async_timed_test(timeout_secs = 10)]
6390 async fn test_retained_queue_stats_burst_then_drain() {
6391 let proc = Proc::isolated();
6392 let (client, _) = proc.client("client").unwrap();
6393 let h = proc.spawn("ret_test", TestActor).unwrap();
6394
6395 let (ready_tx, ready_rx) = oneshot::channel();
6397 let (gate_tx, gate_rx) = oneshot::channel::<()>();
6398 h.wait(&client, ready_tx, gate_rx).await.unwrap();
6399 ready_rx.await.unwrap();
6400
6401 h.noop(&client).await.unwrap();
6403 h.noop(&client).await.unwrap();
6404
6405 let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
6407 loop {
6408 let hwm = proc.queue_depth_high_water_mark();
6409 if hwm >= 2 {
6410 assert!(hwm >= proc.queue_depth_total());
6412 let age = proc.last_nonzero_queue_depth_age_ms();
6414 assert!(
6415 age.is_some(),
6416 "last-nonzero should be Some while pressure is active"
6417 );
6418 assert!(age.unwrap() < 2000, "last-nonzero age should be near zero");
6419 break;
6420 }
6421 assert!(
6422 tokio::time::Instant::now() < deadline,
6423 "timed out waiting for watermark >= 2",
6424 );
6425 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
6426 }
6427
6428 let _ = gate_tx.send(());
6430 let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
6431 loop {
6432 if proc.queue_depth_total() == 0 {
6433 break;
6434 }
6435 assert!(
6436 tokio::time::Instant::now() < deadline,
6437 "timed out waiting for total to drain",
6438 );
6439 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
6440 }
6441
6442 assert!(
6444 proc.queue_depth_high_water_mark() >= 2,
6445 "watermark should retain the peak after drain",
6446 );
6447
6448 let age = proc.last_nonzero_queue_depth_age_ms();
6450 assert!(age.is_some(), "last-nonzero should be Some after pressure");
6451 }
6452
6453 #[test]
6457 fn test_last_nonzero_refreshed_on_dequeue_deterministic() {
6458 use std::sync::atomic::AtomicU64;
6459
6460 static FAKE_NOW: AtomicU64 = AtomicU64::new(0);
6461 fn fake_clock() -> u64 {
6462 FAKE_NOW.load(Ordering::Relaxed)
6463 }
6464
6465 let stats = ProcQueueStats::with_clock(fake_clock);
6466 let depth = Arc::new(AtomicU64::new(0));
6467
6468 assert_eq!(stats.last_nonzero_age_ms(), None);
6470
6471 FAKE_NOW.store(1000, Ordering::Relaxed);
6473 account_enqueue(&depth, &stats, "a");
6474 account_enqueue(&depth, &stats, "a");
6475 assert_eq!(stats.running_total(), 2);
6476 assert_eq!(stats.high_water_mark(), 2);
6477
6478 FAKE_NOW.store(2000, Ordering::Relaxed);
6480 assert_eq!(stats.last_nonzero_age_ms(), Some(1000));
6481
6482 FAKE_NOW.store(3000, Ordering::Relaxed);
6485 account_dequeue(&depth, &stats, "a");
6486 assert_eq!(stats.running_total(), 1);
6487
6488 FAKE_NOW.store(4000, Ordering::Relaxed);
6492 assert_eq!(stats.last_nonzero_age_ms(), Some(1000));
6493
6494 FAKE_NOW.store(5000, Ordering::Relaxed);
6498 account_dequeue(&depth, &stats, "a");
6499 assert_eq!(stats.running_total(), 0);
6500
6501 FAKE_NOW.store(6000, Ordering::Relaxed);
6503 assert_eq!(stats.last_nonzero_age_ms(), Some(3000));
6504
6505 assert_eq!(stats.high_water_mark(), 2);
6507 }
6508
6509 #[test]
6515 fn test_account_cancel_enqueue_restores_counters() {
6516 let stats = ProcQueueStats::new();
6517 let depth = Arc::new(AtomicU64::new(0));
6518
6519 account_enqueue(&depth, &stats, "a");
6520 assert_eq!(stats.running_total(), 1);
6521 assert_eq!(depth.load(Ordering::Relaxed), 1);
6522
6523 account_cancel_enqueue(&depth, &stats, "a");
6524 assert_eq!(
6525 stats.running_total(),
6526 0,
6527 "cancel must restore running_total"
6528 );
6529 assert_eq!(
6530 depth.load(Ordering::Relaxed),
6531 0,
6532 "cancel must restore queue_depth"
6533 );
6534
6535 assert_eq!(stats.high_water_mark(), 1);
6537
6538 account_enqueue(&depth, &stats, "a");
6541 assert_eq!(stats.running_total(), 1);
6542 }
6543}