1#![allow(unused_assignments)]
16
17use std::collections::HashMap;
18use std::mem::take;
19use std::sync::Arc;
20use std::sync::Mutex;
21use std::sync::RwLock;
22use std::sync::RwLockReadGuard;
23use std::time::Duration;
24
25use async_trait::async_trait;
26use enum_as_inner::EnumAsInner;
27use hyperactor::Actor;
28use hyperactor::ActorHandle;
29use hyperactor::Bind;
30use hyperactor::Context;
31use hyperactor::Data;
32use hyperactor::HandleClient;
33use hyperactor::Handler;
34use hyperactor::Instance;
35use hyperactor::PortHandle;
36use hyperactor::RefClient;
37use hyperactor::Unbind;
38use hyperactor::actor::handle_undeliverable_message;
39use hyperactor::actor::remote::Remote;
40use hyperactor::channel;
41use hyperactor::channel::ChannelAddr;
42use hyperactor::mailbox::BoxedMailboxSender;
43use hyperactor::mailbox::DialMailboxRouter;
44use hyperactor::mailbox::IntoBoxedMailboxSender;
45use hyperactor::mailbox::MailboxClient;
46use hyperactor::mailbox::MailboxSender;
47use hyperactor::mailbox::MessageEnvelope;
48use hyperactor::mailbox::Undeliverable;
49use hyperactor::proc::Proc;
50use hyperactor::reference as hyperactor_reference;
51use hyperactor::supervision::ActorSupervisionEvent;
52use hyperactor_config::CONFIG;
53use hyperactor_config::ConfigAttr;
54use hyperactor_config::Flattrs;
55use hyperactor_config::attrs::declare_attrs;
56use serde::Deserialize;
57use serde::Serialize;
58use typeuri::Named;
59
60use crate::Name;
61use crate::config_dump::ConfigDump;
62use crate::config_dump::ConfigDumpResult;
63use crate::pyspy::PySpyDump;
64use crate::pyspy::PySpyWorker;
65use crate::resource;
66
67pub const PROC_AGENT_ACTOR_NAME: &str = "proc_agent";
69
70declare_attrs! {
71 @meta(CONFIG = ConfigAttr::new(
73 Some("HYPERACTOR_MESH_ORPHAN_TIMEOUT".to_string()),
74 Some("mesh_orphan_timeout".to_string()),
75 ))
76 pub attr MESH_ORPHAN_TIMEOUT: Duration = Duration::from_secs(60);
77
78 attr STREAM_STATE_SUBSCRIBER: bool;
82}
83
84#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Named)]
85pub enum GspawnResult {
86 Success {
87 rank: usize,
88 actor_id: hyperactor_reference::ActorId,
89 },
90 Error(String),
91}
92wirevalue::register_type!(GspawnResult);
93
94#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
108struct RepublishIntrospect;
109wirevalue::register_type!(RepublishIntrospect);
110
111fn collect_live_children(
117 proc: &hyperactor::Proc,
118) -> (
119 Vec<hyperactor::introspect::IntrospectRef>,
120 Vec<crate::introspect::NodeRef>,
121) {
122 let all_keys = proc.all_instance_keys();
123 let mut children = Vec::with_capacity(all_keys.len());
124 let mut system_children = Vec::new();
125 for id in all_keys {
126 if let Some(cell) = proc.get_instance(&id) {
127 if cell.is_system() {
128 system_children.push(crate::introspect::NodeRef::Actor(id.clone()));
129 }
130 children.push(hyperactor::introspect::IntrospectRef::Actor(id));
131 }
132 }
133 (children, system_children)
134}
135
136#[derive(
137 Debug,
138 Clone,
139 PartialEq,
140 Serialize,
141 Deserialize,
142 Handler,
143 HandleClient,
144 RefClient,
145 Named
146)]
147pub(crate) enum MeshAgentMessage {
148 Configure {
150 rank: usize,
152 forwarder: ChannelAddr,
154 supervisor: Option<hyperactor_reference::PortRef<ActorSupervisionEvent>>,
156 address_book: HashMap<hyperactor_reference::ProcId, ChannelAddr>,
158 configured: hyperactor_reference::PortRef<usize>,
161 record_supervision_events: bool,
163 },
164
165 Status {
166 status: hyperactor_reference::PortRef<(usize, bool)>,
170 },
171
172 Gspawn {
174 actor_type: String,
176 actor_name: String,
178 params_data: Data,
180 status_port: hyperactor_reference::PortRef<GspawnResult>,
182 },
183}
184
185#[derive(Debug, EnumAsInner, Default)]
187enum State {
188 UnconfiguredV0 {
189 sender: ReconfigurableMailboxSender,
190 },
191
192 ConfiguredV0 {
193 sender: ReconfigurableMailboxSender,
194 rank: usize,
195 supervisor: Option<hyperactor_reference::PortRef<ActorSupervisionEvent>>,
196 },
197
198 V1,
199
200 #[default]
201 Invalid,
202}
203
204impl State {
205 fn rank(&self) -> Option<usize> {
206 match self {
207 State::ConfiguredV0 { rank, .. } => Some(*rank),
208 _ => None,
209 }
210 }
211
212 fn supervisor(&self) -> Option<hyperactor_reference::PortRef<ActorSupervisionEvent>> {
213 match self {
214 State::ConfiguredV0 { supervisor, .. } => supervisor.clone(),
215 _ => None,
216 }
217 }
218}
219
220#[derive(Debug)]
222struct ActorInstanceState {
223 create_rank: usize,
224 spawn: Result<hyperactor_reference::ActorId, anyhow::Error>,
225 stop_initiated: bool,
229 supervision_event: Option<ActorSupervisionEvent>,
232 subscribers: Vec<hyperactor_reference::PortRef<resource::State<ActorState>>>,
235 expiry_time: Option<std::time::SystemTime>,
238 generation: u64,
242 pending_wait_status: Vec<(
246 resource::Status,
247 hyperactor_reference::PortRef<crate::StatusOverlay>,
248 )>,
249}
250
251impl ActorInstanceState {
252 fn status(&self) -> resource::Status {
255 match &self.spawn {
256 Err(e) => resource::Status::Failed(e.to_string()),
257 Ok(_) => match &self.supervision_event {
258 Some(event) if event.is_error() => resource::Status::Failed(format!("{}", event)),
259 Some(_) => resource::Status::Stopped,
260 None if self.stop_initiated => resource::Status::Stopping,
261 None => resource::Status::Running,
262 },
263 }
264 }
265
266 fn is_terminal(&self) -> bool {
269 match &self.spawn {
270 Err(_) => true,
271 Ok(_) => self.supervision_event.is_some(),
272 }
273 }
274
275 fn has_errors(&self) -> bool {
277 self.supervision_event
278 .as_ref()
279 .is_some_and(|e| e.is_error())
280 }
281
282 fn to_state(&self, name: &Name) -> resource::State<ActorState> {
285 let status = self.status();
286 let actor_state = self.spawn.as_ref().ok().map(|actor_id| ActorState {
287 actor_id: actor_id.clone(),
288 create_rank: self.create_rank,
289 supervision_events: self.supervision_event.clone().into_iter().collect(),
290 });
291 resource::State {
292 name: name.clone(),
293 status,
294 state: actor_state,
295 generation: self.generation,
296 timestamp: std::time::SystemTime::now(),
297 }
298 }
299
300 fn notify_status_changed(&mut self, cx: &impl hyperactor::context::Actor, name: &Name) {
305 let state = self.to_state(name);
307 for subscriber in &self.subscribers {
308 let mut headers = Flattrs::new();
309 headers.set(STREAM_STATE_SUBSCRIBER, true);
310 if let Err(e) = subscriber.send_with_headers(cx, headers, state.clone()) {
311 tracing::warn!(
312 "failed to send state update to subscriber {}: {}",
313 subscriber.port_id(),
314 e,
315 );
316 }
317 }
318
319 let status = self.status();
321 self.pending_wait_status.retain(|(min_status, reply)| {
322 if status >= *min_status {
323 let rank = self.create_rank;
324 let overlay =
325 crate::StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status.clone())])
326 .expect("valid single-run overlay");
327 let _ = reply.send(cx, overlay);
328 false
329 } else {
330 true
331 }
332 });
333 }
334}
335
336#[derive(
337 Clone,
338 Debug,
339 Default,
340 PartialEq,
341 Serialize,
342 Deserialize,
343 Named,
344 Bind,
345 Unbind
346)]
347struct SelfCheck {}
348
349#[hyperactor::export(
370 handlers=[
371 MeshAgentMessage,
372 ActorSupervisionEvent,
373 resource::CreateOrUpdate<ActorSpec> { cast = true },
374 resource::Stop { cast = true },
375 resource::StopAll { cast = true },
376 resource::GetState<ActorState> { cast = true },
377 resource::StreamState<ActorState> { cast = true },
378 resource::KeepaliveGetState<ActorState> { cast = true },
379 resource::GetRankStatus { cast = true },
380 resource::WaitRankStatus { cast = true },
381 RepublishIntrospect { cast = true },
382 PySpyDump,
383 ConfigDump,
384 ]
385)]
386pub struct ProcAgent {
387 proc: Proc,
388 remote: Remote,
389 state: State,
390 actor_states: HashMap<Name, ActorInstanceState>,
392 record_supervision_events: bool,
395 introspect_dirty: bool,
398 shutdown_tx: Option<tokio::sync::oneshot::Sender<i32>>,
402 stopping_all: bool,
406 mesh_orphan_timeout: Option<Duration>,
408}
409
410impl ProcAgent {
411 #[hyperactor::observe_result("MeshAgent")]
412 pub(crate) async fn bootstrap(
413 proc_id: hyperactor_reference::ProcId,
414 ) -> Result<(Proc, ActorHandle<Self>), anyhow::Error> {
415 let sender = ReconfigurableMailboxSender::new();
416 let proc = Proc::configured(proc_id.clone(), BoxedMailboxSender::new(sender.clone()));
417
418 let agent = ProcAgent {
419 proc: proc.clone(),
420 remote: Remote::collect(),
421 state: State::UnconfiguredV0 { sender },
422 actor_states: HashMap::new(),
423 record_supervision_events: false,
424 introspect_dirty: false,
425 shutdown_tx: None,
426 stopping_all: false,
427 mesh_orphan_timeout: None,
430 };
431 let handle = proc.spawn::<Self>("mesh", agent)?;
432 Ok((proc, handle))
433 }
434
435 pub(crate) fn boot_v1(
436 proc: Proc,
437 shutdown_tx: Option<tokio::sync::oneshot::Sender<i32>>,
438 ) -> Result<ActorHandle<Self>, anyhow::Error> {
439 let orphan_timeout = hyperactor_config::global::get(MESH_ORPHAN_TIMEOUT);
443 let orphan_timeout = if orphan_timeout.is_zero() {
444 None
445 } else {
446 Some(orphan_timeout)
447 };
448 let agent = ProcAgent {
449 proc: proc.clone(),
450 remote: Remote::collect(),
451 state: State::V1,
452 actor_states: HashMap::new(),
453 record_supervision_events: true,
454 introspect_dirty: false,
455 shutdown_tx,
456 stopping_all: false,
457 mesh_orphan_timeout: orphan_timeout,
458 };
459 proc.spawn::<Self>(PROC_AGENT_ACTOR_NAME, agent)
460 }
461
462 fn all_actors_terminal(&self) -> bool {
466 self.actor_states.values().all(|state| state.is_terminal())
467 }
468
469 async fn shutdown(&mut self) {
473 let has_errors = self.actor_states.values().any(|state| state.has_errors());
474 let exit_code = if has_errors { 1 } else { 0 };
475
476 let flush_timeout =
477 hyperactor_config::global::get(hyperactor::config::FORWARDER_FLUSH_TIMEOUT);
478 match tokio::time::timeout(flush_timeout, self.proc.flush()).await {
479 Ok(Err(err)) => {
480 tracing::warn!("forwarder flush failed during shutdown: {}", err);
481 }
482 Err(_elapsed) => {
483 tracing::warn!("forwarder flush timed out during shutdown");
484 }
485 Ok(Ok(())) => {}
486 }
487
488 tracing::info!(
489 "shutting down process after all actors reached terminal state (exit_code={})",
490 exit_code,
491 );
492
493 if let Some(tx) = self.shutdown_tx.take() {
494 let _ = tx.send(exit_code);
495 return;
496 }
497 std::process::exit(exit_code);
498 }
499
500 fn stop_actor_by_id(&self, actor_id: &hyperactor_reference::ActorId, reason: &str) {
503 tracing::info!(
504 name = "StopActor",
505 %actor_id,
506 actor_name = actor_id.name(),
507 %reason,
508 );
509 self.proc.stop_actor(actor_id, reason.to_string());
510 }
511
512 fn publish_introspect_properties(&self, cx: &impl hyperactor::context::Actor) {
515 let (mut children, mut system_children) = collect_live_children(&self.proc);
516
517 let mut stopped_children: Vec<crate::introspect::NodeRef> = Vec::new();
521 for id in self.proc.all_terminated_actor_ids() {
522 let child_ref = hyperactor::introspect::IntrospectRef::Actor(id.clone());
523 let node_ref = crate::introspect::NodeRef::Actor(id.clone());
524 stopped_children.push(node_ref.clone());
525 if let Some(snapshot) = self.proc.terminated_snapshot(&id) {
526 let snapshot_attrs: hyperactor_config::Attrs =
527 serde_json::from_str(&snapshot.attrs).unwrap_or_default();
528 if snapshot_attrs
529 .get(hyperactor::introspect::IS_SYSTEM)
530 .copied()
531 .unwrap_or(false)
532 {
533 system_children.push(node_ref);
534 }
535 }
536 if !children.contains(&child_ref) {
537 children.push(child_ref);
538 }
539 }
540
541 let stopped_retention_cap =
542 hyperactor_config::global::get(hyperactor::config::TERMINATED_SNAPSHOT_RETENTION);
543
544 let failed_actor_count = self
546 .actor_states
547 .values()
548 .filter(|s| s.has_errors())
549 .count();
550
551 let num_live = children.len();
553 let mut attrs = hyperactor_config::Attrs::new();
554 attrs.set(crate::introspect::NODE_TYPE, "proc".to_string());
555 attrs.set(
556 crate::introspect::PROC_NAME,
557 self.proc.proc_id().to_string(),
558 );
559 attrs.set(crate::introspect::NUM_ACTORS, num_live);
560 attrs.set(hyperactor::introspect::CHILDREN, children);
561 attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children);
562 attrs.set(crate::introspect::STOPPED_CHILDREN, stopped_children);
563 attrs.set(
564 crate::introspect::STOPPED_RETENTION_CAP,
565 stopped_retention_cap,
566 );
567 attrs.set(crate::introspect::IS_POISONED, failed_actor_count > 0);
568 attrs.set(crate::introspect::FAILED_ACTOR_COUNT, failed_actor_count);
569 cx.instance().publish_attrs(attrs);
570 }
571}
572
573#[async_trait]
574impl Actor for ProcAgent {
575 async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
576 this.set_system();
577 self.proc.set_supervision_coordinator(this.port())?;
578 self.publish_introspect_properties(this);
579
580 let proc = self.proc.clone();
583 let self_id = this.self_id().clone();
584 this.set_query_child_handler(move |child_ref| {
585 use hyperactor::introspect::IntrospectResult;
586
587 if let hyperactor::reference::Reference::Actor(id) = child_ref {
588 if let Some(snapshot) = proc.terminated_snapshot(id) {
589 return snapshot;
590 }
591 }
592
593 if let hyperactor::reference::Reference::Proc(proc_id) = child_ref {
601 if proc_id == proc.proc_id() {
602 let (mut children, mut system_children) = collect_live_children(&proc);
603
604 let mut stopped_children: Vec<crate::introspect::NodeRef> = Vec::new();
605 for id in proc.all_terminated_actor_ids() {
606 let child_ref = hyperactor::introspect::IntrospectRef::Actor(id.clone());
607 let node_ref = crate::introspect::NodeRef::Actor(id.clone());
608 stopped_children.push(node_ref.clone());
609 if let Some(snapshot) = proc.terminated_snapshot(&id) {
610 let snapshot_attrs: hyperactor_config::Attrs =
611 serde_json::from_str(&snapshot.attrs).unwrap_or_default();
612 if snapshot_attrs
613 .get(hyperactor::introspect::IS_SYSTEM)
614 .copied()
615 .unwrap_or(false)
616 {
617 system_children.push(node_ref);
618 }
619 }
620 if !children.contains(&child_ref) {
621 children.push(child_ref);
622 }
623 }
624
625 let stopped_retention_cap = hyperactor_config::global::get(
626 hyperactor::config::TERMINATED_SNAPSHOT_RETENTION,
627 );
628
629 let (is_poisoned, failed_actor_count) = proc
630 .get_instance(&self_id)
631 .and_then(|cell| cell.published_attrs())
632 .map(|attrs| {
633 let is_poisoned = attrs
634 .get(crate::introspect::IS_POISONED)
635 .copied()
636 .unwrap_or(false);
637 let failed_actor_count = attrs
638 .get(crate::introspect::FAILED_ACTOR_COUNT)
639 .copied()
640 .unwrap_or(0);
641 (is_poisoned, failed_actor_count)
642 })
643 .unwrap_or((false, 0));
644
645 let num_live = children.len();
647 let mut attrs = hyperactor_config::Attrs::new();
648 attrs.set(crate::introspect::NODE_TYPE, "proc".to_string());
649 attrs.set(crate::introspect::PROC_NAME, proc_id.to_string());
650 attrs.set(crate::introspect::NUM_ACTORS, num_live);
651 attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children);
652 attrs.set(crate::introspect::STOPPED_CHILDREN, stopped_children);
653 attrs.set(
654 crate::introspect::STOPPED_RETENTION_CAP,
655 stopped_retention_cap,
656 );
657 attrs.set(crate::introspect::IS_POISONED, is_poisoned);
658 attrs.set(crate::introspect::FAILED_ACTOR_COUNT, failed_actor_count);
659 let attrs_json =
660 serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string());
661
662 return IntrospectResult {
663 identity: hyperactor::introspect::IntrospectRef::Proc(proc_id.clone()),
664 attrs: attrs_json,
665 children,
666 parent: None,
667 as_of: std::time::SystemTime::now(),
668 };
669 }
670 }
671
672 {
673 let mut error_attrs = hyperactor_config::Attrs::new();
674 error_attrs.set(hyperactor::introspect::ERROR_CODE, "not_found".to_string());
675 error_attrs.set(
676 hyperactor::introspect::ERROR_MESSAGE,
677 format!("child {} not found", child_ref),
678 );
679 let identity = match child_ref {
680 hyperactor::reference::Reference::Proc(id) => {
681 hyperactor::introspect::IntrospectRef::Proc(id.clone())
682 }
683 hyperactor::reference::Reference::Actor(id) => {
684 hyperactor::introspect::IntrospectRef::Actor(id.clone())
685 }
686 hyperactor::reference::Reference::Port(id) => {
687 hyperactor::introspect::IntrospectRef::Actor(id.actor_id().clone())
688 }
689 };
690 IntrospectResult {
691 identity,
692 attrs: serde_json::to_string(&error_attrs).unwrap_or_else(|_| "{}".to_string()),
693 children: Vec::new(),
694 parent: None,
695 as_of: std::time::SystemTime::now(),
696 }
697 }
698 });
699
700 if let Some(delay) = &self.mesh_orphan_timeout {
701 this.self_message_with_delay(SelfCheck::default(), *delay)?;
702 }
703 Ok(())
704 }
705
706 async fn handle_undeliverable_message(
707 &mut self,
708 cx: &Instance<Self>,
709 envelope: Undeliverable<MessageEnvelope>,
710 ) -> Result<(), anyhow::Error> {
711 if let Some(true) = envelope.0.headers().get(STREAM_STATE_SUBSCRIBER) {
712 let dest_port_id = envelope.0.dest().clone();
713 let port =
714 hyperactor_reference::PortRef::<resource::State<ActorState>>::attest(dest_port_id);
715 for instance in self.actor_states.values_mut() {
717 instance.subscribers.retain(|s| s != &port);
718 }
719 Ok(())
720 } else {
721 handle_undeliverable_message(cx, envelope)
722 }
723 }
724}
725
726#[async_trait]
727#[hyperactor::handle(MeshAgentMessage)]
728impl MeshAgentMessageHandler for ProcAgent {
729 async fn configure(
730 &mut self,
731 cx: &Context<Self>,
732 rank: usize,
733 forwarder: ChannelAddr,
734 supervisor: Option<hyperactor_reference::PortRef<ActorSupervisionEvent>>,
735 address_book: HashMap<hyperactor_reference::ProcId, ChannelAddr>,
736 configured: hyperactor_reference::PortRef<usize>,
737 record_supervision_events: bool,
738 ) -> Result<(), anyhow::Error> {
739 anyhow::ensure!(
740 self.state.is_unconfigured_v0(),
741 "mesh agent cannot be (re-)configured"
742 );
743 self.record_supervision_events = record_supervision_events;
744
745 let client = MailboxClient::new(channel::dial(forwarder)?);
746 let router =
747 DialMailboxRouter::new_with_default_direct_addressed_remote_only(client.into_boxed());
748
749 for (proc_id, addr) in address_book {
750 router.bind(proc_id.into(), addr);
751 }
752
753 let sender = take(&mut self.state).into_unconfigured_v0().unwrap();
754 assert!(sender.configure(router.into_boxed()));
755
756 self.state = State::ConfiguredV0 {
760 sender,
761 rank,
762 supervisor,
763 };
764 configured.send(cx, rank)?;
765
766 Ok(())
767 }
768
769 async fn gspawn(
770 &mut self,
771 cx: &Context<Self>,
772 actor_type: String,
773 actor_name: String,
774 params_data: Data,
775 status_port: hyperactor_reference::PortRef<GspawnResult>,
776 ) -> Result<(), anyhow::Error> {
777 anyhow::ensure!(
778 self.state.is_configured_v0(),
779 "mesh agent is not v0 configured"
780 );
781 let actor_id = match self
782 .remote
783 .gspawn(
784 &self.proc,
785 &actor_type,
786 &actor_name,
787 params_data,
788 cx.headers().clone(),
789 )
790 .await
791 {
792 Ok(id) => id,
793 Err(err) => {
794 status_port.send(cx, GspawnResult::Error(format!("gspawn failed: {}", err)))?;
795 return Err(anyhow::anyhow!("gspawn failed"));
796 }
797 };
798 status_port.send(
799 cx,
800 GspawnResult::Success {
801 rank: self.state.rank().unwrap(),
802 actor_id,
803 },
804 )?;
805 self.publish_introspect_properties(cx);
806 Ok(())
807 }
808
809 async fn status(
810 &mut self,
811 cx: &Context<Self>,
812 status_port: hyperactor_reference::PortRef<(usize, bool)>,
813 ) -> Result<(), anyhow::Error> {
814 match &self.state {
815 State::ConfiguredV0 { rank, .. } => {
816 status_port.send(cx, (*rank, true))?;
818 Ok(())
819 }
820 State::UnconfiguredV0 { .. } => {
821 Err(anyhow::anyhow!(
823 "status unavailable: v0 agent not configured (waiting for Configure)"
824 ))
825 }
826 State::V1 => {
827 Err(anyhow::anyhow!(
829 "status unsupported in v1/owned path (no rank)"
830 ))
831 }
832 State::Invalid => Err(anyhow::anyhow!(
833 "status unavailable: agent in invalid state"
834 )),
835 }
836 }
837}
838
839#[async_trait]
840impl Handler<ActorSupervisionEvent> for ProcAgent {
841 async fn handle(
842 &mut self,
843 cx: &Context<Self>,
844 event: ActorSupervisionEvent,
845 ) -> anyhow::Result<()> {
846 if self.record_supervision_events {
847 if event.is_error() {
848 tracing::warn!(
849 name = "SupervisionEvent",
850 proc_id = %self.proc.proc_id(),
851 %event,
852 "recording supervision error",
853 );
854 } else {
855 tracing::debug!(
856 name = "SupervisionEvent",
857 proc_id = %self.proc.proc_id(),
858 %event,
859 "recording non-error supervision event",
860 );
861 }
862 if let Some((name, instance)) = self
864 .actor_states
865 .iter_mut()
866 .find(|(_, s)| s.spawn.as_ref().ok() == Some(&event.actor_id))
867 {
868 instance.supervision_event = Some(event.clone());
869 instance.generation += 1;
870 let name = name.clone();
871 instance.notify_status_changed(cx, &name);
872 }
873 if !self.introspect_dirty {
877 self.introspect_dirty = true;
878 let _ = cx.self_message_with_delay(
879 RepublishIntrospect,
880 std::time::Duration::from_millis(100),
881 );
882 }
883
884 if self.stopping_all && self.all_actors_terminal() {
887 self.shutdown().await;
888 }
889 }
890 if let Some(supervisor) = self.state.supervisor() {
891 supervisor.send(cx, event)?;
892 } else if !self.record_supervision_events && event.is_error() {
893 tracing::error!(
896 name = "supervision_event_transmit_failed",
897 proc_id = %cx.self_id().proc_id(),
898 %event,
899 "could not propagate supervision event, crashing",
900 );
901
902 std::process::exit(1);
905 }
906 Ok(())
907 }
908}
909
910#[async_trait]
911impl Handler<RepublishIntrospect> for ProcAgent {
912 async fn handle(&mut self, cx: &Context<Self>, _: RepublishIntrospect) -> anyhow::Result<()> {
913 if self.introspect_dirty {
914 self.introspect_dirty = false;
915 self.publish_introspect_properties(cx);
916 }
917 Ok(())
918 }
919}
920
921#[async_trait]
922impl Handler<PySpyDump> for ProcAgent {
923 async fn handle(
924 &mut self,
925 cx: &Context<Self>,
926 message: PySpyDump,
927 ) -> Result<(), anyhow::Error> {
928 PySpyWorker::spawn_and_forward(cx, message.opts, message.result)
929 }
930}
931
932#[async_trait]
933impl Handler<ConfigDump> for ProcAgent {
934 async fn handle(
935 &mut self,
936 cx: &Context<Self>,
937 message: ConfigDump,
938 ) -> Result<(), anyhow::Error> {
939 let entries = hyperactor_config::global::config_entries();
940 let _ = message.result.send(cx, ConfigDumpResult { entries });
943 Ok(())
944 }
945}
946
947#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
951pub struct ActorSpec {
952 pub actor_type: String,
954 pub params_data: Data,
956}
957wirevalue::register_type!(ActorSpec);
958
959#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
961pub struct ActorState {
962 pub actor_id: hyperactor_reference::ActorId,
964 pub create_rank: usize,
966 pub supervision_events: Vec<ActorSupervisionEvent>,
968}
969wirevalue::register_type!(ActorState);
970
971#[async_trait]
972impl Handler<resource::CreateOrUpdate<ActorSpec>> for ProcAgent {
973 async fn handle(
974 &mut self,
975 cx: &Context<Self>,
976 create_or_update: resource::CreateOrUpdate<ActorSpec>,
977 ) -> anyhow::Result<()> {
978 if self.actor_states.contains_key(&create_or_update.name) {
979 return Ok(());
981 }
982 let create_rank = create_or_update.rank.unwrap();
983 if self.actor_states.values().any(|s| s.has_errors()) {
987 self.actor_states.insert(
988 create_or_update.name.clone(),
989 ActorInstanceState {
990 spawn: Err(anyhow::anyhow!(
991 "Cannot spawn new actors on mesh with supervision events"
992 )),
993 create_rank,
994 stop_initiated: false,
995 supervision_event: None,
996 subscribers: Vec::new(),
997 expiry_time: None,
998 generation: 1,
999 pending_wait_status: Vec::new(),
1000 },
1001 );
1002 return Ok(());
1003 }
1004
1005 let ActorSpec {
1006 actor_type,
1007 params_data,
1008 } = create_or_update.spec;
1009 self.actor_states.insert(
1010 create_or_update.name.clone(),
1011 ActorInstanceState {
1012 create_rank,
1013 spawn: self
1014 .remote
1015 .gspawn(
1016 &self.proc,
1017 &actor_type,
1018 &create_or_update.name.to_string(),
1019 params_data,
1020 cx.headers().clone(),
1021 )
1022 .await,
1023 stop_initiated: false,
1024 supervision_event: None,
1025 subscribers: Vec::new(),
1026 expiry_time: None,
1027 generation: 1,
1028 pending_wait_status: Vec::new(),
1029 },
1030 );
1031
1032 self.publish_introspect_properties(cx);
1033 Ok(())
1034 }
1035}
1036
1037#[async_trait]
1038impl Handler<resource::Stop> for ProcAgent {
1039 async fn handle(&mut self, cx: &Context<Self>, message: resource::Stop) -> anyhow::Result<()> {
1040 let actor_id = match self.actor_states.get_mut(&message.name) {
1041 Some(actor_state) => {
1042 let id = actor_state.spawn.as_ref().ok().cloned();
1043 if id.is_some() && !actor_state.stop_initiated {
1044 actor_state.stop_initiated = true;
1045 actor_state.generation += 1;
1046 actor_state.notify_status_changed(cx, &message.name);
1047 id
1048 } else {
1049 None
1050 }
1051 }
1052 None => None,
1053 };
1054 if let Some(actor_id) = actor_id {
1055 self.stop_actor_by_id(&actor_id, &message.reason);
1056 }
1057
1058 Ok(())
1059 }
1060}
1061
1062#[async_trait]
1066impl Handler<resource::StopAll> for ProcAgent {
1067 async fn handle(
1068 &mut self,
1069 _cx: &Context<Self>,
1070 message: resource::StopAll,
1071 ) -> anyhow::Result<()> {
1072 self.stopping_all = true;
1073
1074 let to_stop: Vec<hyperactor_reference::ActorId> = self
1076 .actor_states
1077 .values_mut()
1078 .filter_map(|state| {
1079 if state.stop_initiated {
1080 return None;
1081 }
1082 state.stop_initiated = true;
1083 state.spawn.as_ref().ok().cloned()
1084 })
1085 .collect();
1086
1087 for actor_id in &to_stop {
1088 self.stop_actor_by_id(actor_id, &message.reason);
1089 }
1090
1091 if self.all_actors_terminal() {
1093 self.shutdown().await;
1094 }
1095
1096 Ok(())
1097 }
1098}
1099
1100#[async_trait]
1101impl Handler<resource::GetRankStatus> for ProcAgent {
1102 async fn handle(
1103 &mut self,
1104 cx: &Context<Self>,
1105 get_rank_status: resource::GetRankStatus,
1106 ) -> anyhow::Result<()> {
1107 use crate::StatusOverlay;
1108 use crate::resource::Status;
1109
1110 let (rank, status) = match self.actor_states.get(&get_rank_status.name) {
1111 Some(state) => (state.create_rank, state.status()),
1112 None => (usize::MAX, Status::NotExist),
1113 };
1114
1115 let overlay = if rank == usize::MAX {
1118 StatusOverlay::new()
1119 } else {
1120 StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status)])
1121 .expect("valid single-run overlay")
1122 };
1123 let result = get_rank_status.reply.send(cx, overlay);
1124 if let Err(e) = result {
1128 tracing::warn!(
1129 actor = %cx.self_id(),
1130 "failed to send GetRankStatus reply to {} due to error: {}",
1131 get_rank_status.reply.port_id().actor_id(),
1132 e
1133 );
1134 }
1135 Ok(())
1136 }
1137}
1138
1139#[async_trait]
1140impl Handler<resource::WaitRankStatus> for ProcAgent {
1141 async fn handle(
1142 &mut self,
1143 cx: &Context<Self>,
1144 msg: resource::WaitRankStatus,
1145 ) -> anyhow::Result<()> {
1146 use crate::StatusOverlay;
1147 use crate::resource::Status;
1148
1149 let (rank, status) = match self.actor_states.get(&msg.name) {
1150 Some(state) => (state.create_rank, state.status()),
1151 None => (usize::MAX, Status::NotExist),
1152 };
1153
1154 if status >= msg.min_status || rank == usize::MAX {
1156 let overlay = if rank == usize::MAX {
1157 StatusOverlay::new()
1158 } else {
1159 StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status)])
1160 .expect("valid single-run overlay")
1161 };
1162 let _ = msg.reply.send(cx, overlay);
1163 return Ok(());
1164 }
1165
1166 if let Some(state) = self.actor_states.get_mut(&msg.name) {
1169 state.pending_wait_status.push((msg.min_status, msg.reply));
1170 }
1171 Ok(())
1172 }
1173}
1174
1175#[async_trait]
1176impl Handler<resource::GetState<ActorState>> for ProcAgent {
1177 async fn handle(
1178 &mut self,
1179 cx: &Context<Self>,
1180 get_state: resource::GetState<ActorState>,
1181 ) -> anyhow::Result<()> {
1182 let state = match self.actor_states.get(&get_state.name) {
1183 Some(instance) => instance.to_state(&get_state.name),
1184 None => resource::State {
1185 name: get_state.name.clone(),
1186 status: resource::Status::NotExist,
1187 state: None,
1188 generation: 0,
1189 timestamp: std::time::SystemTime::now(),
1190 },
1191 };
1192
1193 let result = get_state.reply.send(cx, state);
1194 if let Err(e) = result {
1195 tracing::warn!(
1196 actor = %cx.self_id(),
1197 "failed to send GetState reply to {} due to error: {}",
1198 get_state.reply.port_id().actor_id(),
1199 e
1200 );
1201 }
1202 Ok(())
1203 }
1204}
1205
1206#[async_trait]
1207impl Handler<resource::StreamState<ActorState>> for ProcAgent {
1208 async fn handle(
1209 &mut self,
1210 cx: &Context<Self>,
1211 stream_state: resource::StreamState<ActorState>,
1212 ) -> anyhow::Result<()> {
1213 let state = match self.actor_states.get_mut(&stream_state.name) {
1214 Some(instance) => {
1215 let state = instance.to_state(&stream_state.name);
1216 instance.subscribers.push(stream_state.subscriber.clone());
1217 state
1218 }
1219 None => resource::State {
1220 name: stream_state.name.clone(),
1221 status: resource::Status::NotExist,
1222 state: None,
1223 generation: 0,
1224 timestamp: std::time::SystemTime::now(),
1225 },
1226 };
1227
1228 let mut headers = Flattrs::new();
1230 headers.set(STREAM_STATE_SUBSCRIBER, true);
1231 if let Err(e) = stream_state
1232 .subscriber
1233 .send_with_headers(cx, headers, state)
1234 {
1235 tracing::warn!(
1236 actor = %cx.self_id(),
1237 "failed to send initial StreamState to {}: {}",
1238 stream_state.subscriber.port_id().actor_id(),
1239 e,
1240 );
1241 }
1242 Ok(())
1243 }
1244}
1245
1246#[async_trait]
1247impl Handler<resource::KeepaliveGetState<ActorState>> for ProcAgent {
1248 async fn handle(
1249 &mut self,
1250 cx: &Context<Self>,
1251 message: resource::KeepaliveGetState<ActorState>,
1252 ) -> anyhow::Result<()> {
1253 if let Ok(instance_state) = self
1255 .actor_states
1256 .get_mut(&message.get_state.name)
1257 .ok_or_else(|| {
1258 anyhow::anyhow!(
1259 "attempting to register a keepalive for an actor that doesn't exist: {}",
1260 message.get_state.name
1261 )
1262 })
1263 {
1264 instance_state.expiry_time = Some(message.expires_after);
1265 }
1266
1267 <Self as Handler<resource::GetState<ActorState>>>::handle(self, cx, message.get_state).await
1269 }
1270}
1271
1272#[derive(Debug, hyperactor::Handler, hyperactor::HandleClient)]
1275pub struct NewClientInstance {
1276 #[reply]
1277 pub client_instance: PortHandle<Instance<()>>,
1278}
1279
1280#[async_trait]
1281impl Handler<NewClientInstance> for ProcAgent {
1282 async fn handle(
1283 &mut self,
1284 cx: &Context<Self>,
1285 NewClientInstance { client_instance }: NewClientInstance,
1286 ) -> anyhow::Result<()> {
1287 let (instance, _handle) = self.proc.instance("client")?;
1288 client_instance.send(cx, instance)?;
1289 Ok(())
1290 }
1291}
1292
1293#[derive(Debug, hyperactor::Handler, hyperactor::HandleClient)]
1296pub struct GetProc {
1297 #[reply]
1298 pub proc: PortHandle<Proc>,
1299}
1300
1301#[async_trait]
1302impl Handler<GetProc> for ProcAgent {
1303 async fn handle(
1304 &mut self,
1305 cx: &Context<Self>,
1306 GetProc { proc }: GetProc,
1307 ) -> anyhow::Result<()> {
1308 proc.send(cx, self.proc.clone())?;
1309 Ok(())
1310 }
1311}
1312
1313#[async_trait]
1314impl Handler<SelfCheck> for ProcAgent {
1315 async fn handle(&mut self, cx: &Context<Self>, _: SelfCheck) -> anyhow::Result<()> {
1316 let Some(duration) = &self.mesh_orphan_timeout else {
1322 return Ok(());
1323 };
1324 let duration = duration.clone();
1325 let now = std::time::SystemTime::now();
1326
1327 let expired: Vec<(Name, hyperactor_reference::ActorId)> = self
1329 .actor_states
1330 .iter()
1331 .filter_map(|(name, state)| {
1332 let expiry = state.expiry_time?;
1333 if now > expiry && !state.stop_initiated {
1335 if let Ok(actor_id) = &state.spawn {
1336 return Some((name.clone(), actor_id.clone()));
1337 }
1338 }
1339 None
1340 })
1341 .collect();
1342
1343 if !expired.is_empty() {
1344 tracing::info!(
1345 "stopping {} orphaned actors past their keepalive expiry",
1346 expired.len(),
1347 );
1348 }
1349
1350 for (name, actor_id) in expired {
1351 if let Some(state) = self.actor_states.get_mut(&name) {
1352 state.stop_initiated = true;
1353 }
1354 self.stop_actor_by_id(&actor_id, "orphaned");
1355 }
1356
1357 cx.self_message_with_delay(SelfCheck::default(), duration)?;
1359 Ok(())
1360 }
1361}
1362
1363#[derive(Clone)]
1366pub(crate) struct ReconfigurableMailboxSender {
1367 state: Arc<RwLock<ReconfigurableMailboxSenderState>>,
1368}
1369
1370impl std::fmt::Debug for ReconfigurableMailboxSender {
1371 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1372 f.debug_struct("ReconfigurableMailboxSender").finish()
1375 }
1376}
1377
1378pub(crate) struct ReconfigurableMailboxSenderInner<'a> {
1391 guard: RwLockReadGuard<'a, ReconfigurableMailboxSenderState>,
1392}
1393
1394impl<'a> ReconfigurableMailboxSenderInner<'a> {
1395 pub(crate) fn as_configured(&self) -> Option<&BoxedMailboxSender> {
1396 self.guard.as_configured()
1397 }
1398}
1399
1400type Post = (MessageEnvelope, PortHandle<Undeliverable<MessageEnvelope>>);
1401
1402#[derive(EnumAsInner, Debug)]
1403enum ReconfigurableMailboxSenderState {
1404 Queueing(Mutex<Vec<Post>>),
1405 Configured(BoxedMailboxSender),
1406}
1407
1408impl ReconfigurableMailboxSender {
1409 pub(crate) fn new() -> Self {
1410 Self {
1411 state: Arc::new(RwLock::new(ReconfigurableMailboxSenderState::Queueing(
1412 Mutex::new(Vec::new()),
1413 ))),
1414 }
1415 }
1416
1417 pub(crate) fn configure(&self, sender: BoxedMailboxSender) -> bool {
1421 let mut state = self.state.write().unwrap();
1423 if state.is_configured() {
1424 return false;
1425 }
1426
1427 let queued = std::mem::replace(
1429 &mut *state,
1430 ReconfigurableMailboxSenderState::Configured(sender),
1431 );
1432
1433 let configured_sender = state.as_configured().expect("just configured");
1436
1437 for (envelope, return_handle) in queued.into_queueing().unwrap().into_inner().unwrap() {
1439 configured_sender.post(envelope, return_handle);
1440 }
1441
1442 true
1443 }
1444
1445 pub(crate) fn as_inner<'a>(
1446 &'a self,
1447 ) -> Result<ReconfigurableMailboxSenderInner<'a>, anyhow::Error> {
1448 let state = self.state.read().unwrap();
1449 if state.is_configured() {
1450 Ok(ReconfigurableMailboxSenderInner { guard: state })
1451 } else {
1452 Err(anyhow::anyhow!("cannot get inner sender: not configured"))
1453 }
1454 }
1455}
1456
1457#[async_trait]
1458impl MailboxSender for ReconfigurableMailboxSender {
1459 fn post(
1460 &self,
1461 envelope: MessageEnvelope,
1462 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1463 ) {
1464 match &*self.state.read().unwrap() {
1465 ReconfigurableMailboxSenderState::Queueing(queue) => {
1466 queue.lock().unwrap().push((envelope, return_handle));
1467 }
1468 ReconfigurableMailboxSenderState::Configured(sender) => {
1469 sender.post(envelope, return_handle);
1470 }
1471 }
1472 }
1473
1474 fn post_unchecked(
1475 &self,
1476 envelope: MessageEnvelope,
1477 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1478 ) {
1479 match &*self.state.read().unwrap() {
1480 ReconfigurableMailboxSenderState::Queueing(queue) => {
1481 queue.lock().unwrap().push((envelope, return_handle));
1482 }
1483 ReconfigurableMailboxSenderState::Configured(sender) => {
1484 sender.post_unchecked(envelope, return_handle);
1485 }
1486 }
1487 }
1488
1489 async fn flush(&self) -> Result<(), anyhow::Error> {
1490 let sender = match &*self.state.read().unwrap() {
1491 ReconfigurableMailboxSenderState::Queueing(_) => return Ok(()),
1492 ReconfigurableMailboxSenderState::Configured(sender) => sender.clone(),
1493 };
1494 sender.flush().await
1495 }
1496}
1497
1498#[cfg(test)]
1499mod tests {
1500 use std::sync::Arc;
1501 use std::sync::Mutex;
1502
1503 use hyperactor::mailbox::BoxedMailboxSender;
1504 use hyperactor::mailbox::Mailbox;
1505 use hyperactor::mailbox::MailboxSender;
1506 use hyperactor::mailbox::MessageEnvelope;
1507 use hyperactor::mailbox::PortHandle;
1508 use hyperactor::mailbox::Undeliverable;
1509 use hyperactor::testing::ids::test_actor_id;
1510 use hyperactor::testing::ids::test_port_id;
1511 use hyperactor_config::Flattrs;
1512
1513 use super::*;
1514
1515 #[derive(Debug, Clone)]
1516 struct QueueingMailboxSender {
1517 messages: Arc<Mutex<Vec<MessageEnvelope>>>,
1518 }
1519
1520 impl QueueingMailboxSender {
1521 fn new() -> Self {
1522 Self {
1523 messages: Arc::new(Mutex::new(Vec::new())),
1524 }
1525 }
1526
1527 fn get_messages(&self) -> Vec<MessageEnvelope> {
1528 self.messages.lock().unwrap().clone()
1529 }
1530 }
1531
1532 #[async_trait]
1533 impl MailboxSender for QueueingMailboxSender {
1534 fn post_unchecked(
1535 &self,
1536 envelope: MessageEnvelope,
1537 _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1538 ) {
1539 self.messages.lock().unwrap().push(envelope);
1540 }
1541 }
1542
1543 fn envelope(data: u64) -> MessageEnvelope {
1545 MessageEnvelope::serialize(
1546 test_actor_id("world_0", "sender"),
1547 test_port_id("world_0", "receiver", 1),
1548 &data,
1549 Flattrs::new(),
1550 )
1551 .unwrap()
1552 }
1553
1554 fn return_handle() -> PortHandle<Undeliverable<MessageEnvelope>> {
1555 let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
1556 let (port, _receiver) = mbox.open_port::<Undeliverable<MessageEnvelope>>();
1557 port
1558 }
1559
1560 #[test]
1561 fn test_queueing_before_configure() {
1562 let sender = ReconfigurableMailboxSender::new();
1563
1564 let test_sender = QueueingMailboxSender::new();
1565 let boxed_sender = BoxedMailboxSender::new(test_sender.clone());
1566
1567 let return_handle = return_handle();
1568 sender.post(envelope(1), return_handle.clone());
1569 sender.post(envelope(2), return_handle.clone());
1570
1571 assert_eq!(test_sender.get_messages().len(), 0);
1572
1573 sender.configure(boxed_sender);
1574
1575 let messages = test_sender.get_messages();
1576 assert_eq!(messages.len(), 2);
1577
1578 assert_eq!(messages[0].deserialized::<u64>().unwrap(), 1);
1579 assert_eq!(messages[1].deserialized::<u64>().unwrap(), 2);
1580 }
1581
1582 #[test]
1583 fn test_direct_delivery_after_configure() {
1584 let sender = ReconfigurableMailboxSender::new();
1586
1587 let test_sender = QueueingMailboxSender::new();
1588 let boxed_sender = BoxedMailboxSender::new(test_sender.clone());
1589 sender.configure(boxed_sender);
1590
1591 let return_handle = return_handle();
1592 sender.post(envelope(3), return_handle.clone());
1593 sender.post(envelope(4), return_handle.clone());
1594
1595 let messages = test_sender.get_messages();
1596 assert_eq!(messages.len(), 2);
1597
1598 assert_eq!(messages[0].deserialized::<u64>().unwrap(), 3);
1599 assert_eq!(messages[1].deserialized::<u64>().unwrap(), 4);
1600 }
1601
1602 #[test]
1603 fn test_multiple_configurations() {
1604 let sender = ReconfigurableMailboxSender::new();
1605 let boxed_sender = BoxedMailboxSender::new(QueueingMailboxSender::new());
1606
1607 assert!(sender.configure(boxed_sender.clone()));
1608 assert!(!sender.configure(boxed_sender));
1609 }
1610
1611 #[test]
1612 fn test_mixed_queueing_and_direct_delivery() {
1613 let sender = ReconfigurableMailboxSender::new();
1614
1615 let test_sender = QueueingMailboxSender::new();
1616 let boxed_sender = BoxedMailboxSender::new(test_sender.clone());
1617
1618 let return_handle = return_handle();
1619 sender.post(envelope(5), return_handle.clone());
1620 sender.post(envelope(6), return_handle.clone());
1621
1622 sender.configure(boxed_sender);
1623
1624 sender.post(envelope(7), return_handle.clone());
1625 sender.post(envelope(8), return_handle.clone());
1626
1627 let messages = test_sender.get_messages();
1628 assert_eq!(messages.len(), 4);
1629
1630 assert_eq!(messages[0].deserialized::<u64>().unwrap(), 5);
1631 assert_eq!(messages[1].deserialized::<u64>().unwrap(), 6);
1632 assert_eq!(messages[2].deserialized::<u64>().unwrap(), 7);
1633 assert_eq!(messages[3].deserialized::<u64>().unwrap(), 8);
1634 }
1635
1636 #[derive(Debug, Default, Serialize, Deserialize)]
1638 #[hyperactor::export(handlers = [])]
1639 struct ExtraActor;
1640 impl hyperactor::Actor for ExtraActor {}
1641 hyperactor::remote!(ExtraActor);
1642 #[tokio::test]
1656 async fn test_query_child_proc_returns_live_children() {
1657 use hyperactor::Proc;
1658 use hyperactor::actor::ActorStatus;
1659 use hyperactor::channel::ChannelTransport;
1660 use hyperactor::introspect::IntrospectMessage;
1661 use hyperactor::introspect::IntrospectResult;
1662 use hyperactor::reference as hyperactor_reference;
1663
1664 let proc = Proc::direct(ChannelTransport::Unix.any(), "test_proc".to_string()).unwrap();
1665 let agent_handle = ProcAgent::boot_v1(proc.clone(), None).unwrap();
1666
1667 agent_handle
1669 .status()
1670 .wait_for(|s| matches!(s, ActorStatus::Idle))
1671 .await
1672 .unwrap();
1673
1674 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1676 let (client, _client_handle) = client_proc.instance("client").unwrap();
1677
1678 let agent_id = proc.proc_id().actor_id(PROC_AGENT_ACTOR_NAME, 0);
1679 let port =
1680 hyperactor_reference::PortRef::<IntrospectMessage>::attest_message_port(&agent_id);
1681
1682 let query = |client: &hyperactor::Instance<()>| {
1685 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1686 port.send(
1687 client,
1688 IntrospectMessage::QueryChild {
1689 child_ref: hyperactor_reference::Reference::Proc(proc.proc_id().clone()),
1690 reply: reply_port.bind(),
1691 },
1692 )
1693 .unwrap();
1694 reply_rx
1695 };
1696 let recv = |rx: hyperactor::mailbox::OncePortReceiver<IntrospectResult>| async move {
1697 tokio::time::timeout(std::time::Duration::from_secs(5), rx.recv())
1698 .await
1699 .expect("QueryChild(Proc) timed out — reply never delivered")
1700 .expect("reply channel closed")
1701 };
1702
1703 let payload = recv(query(&client)).await;
1705 let attrs: hyperactor_config::Attrs =
1707 serde_json::from_str(&payload.attrs).expect("valid attrs JSON");
1708 assert_eq!(
1709 attrs.get(crate::introspect::NODE_TYPE).map(String::as_str),
1710 Some("proc"),
1711 "expected node_type=proc in attrs, got {:?}",
1712 payload.attrs
1713 );
1714 assert!(
1715 payload
1716 .children
1717 .iter()
1718 .any(|c| c.to_string().contains(PROC_AGENT_ACTOR_NAME)),
1719 "initial children {:?} should contain proc_agent",
1720 payload.children
1721 );
1722 let initial_count = payload.children.len();
1723
1724 proc.spawn("extra_actor", ExtraActor).unwrap();
1728
1729 let payload2 = recv(query(&client)).await;
1731 let attrs2: hyperactor_config::Attrs =
1732 serde_json::from_str(&payload2.attrs).expect("valid attrs JSON");
1733 assert_eq!(
1734 attrs2.get(crate::introspect::NODE_TYPE).map(String::as_str),
1735 Some("proc"),
1736 "expected node_type=proc in attrs, got {:?}",
1737 payload2.attrs
1738 );
1739 assert!(
1740 payload2
1741 .children
1742 .iter()
1743 .any(|c| c.to_string().contains("extra_actor")),
1744 "after direct spawn, children {:?} should contain extra_actor",
1745 payload2.children
1746 );
1747 assert!(
1748 payload2.children.len() > initial_count,
1749 "expected at least {} children after direct spawn, got {:?}",
1750 initial_count + 1,
1751 payload2.children
1752 );
1753 }
1754
1755 #[tokio::test]
1762 async fn test_rapid_spawn_stop_does_not_stall_proc_agent() {
1763 use std::sync::Arc;
1764 use std::sync::atomic::AtomicUsize;
1765 use std::sync::atomic::Ordering;
1766
1767 use hyperactor::Proc;
1768 use hyperactor::actor::ActorStatus;
1769 use hyperactor::channel::ChannelTransport;
1770 use hyperactor::introspect::IntrospectMessage;
1771 use hyperactor::introspect::IntrospectResult;
1772 use hyperactor::reference as hyperactor_reference;
1773
1774 let proc = Proc::direct(ChannelTransport::Unix.any(), "test_proc".to_string()).unwrap();
1775 let agent_handle = ProcAgent::boot_v1(proc.clone(), None).unwrap();
1776
1777 agent_handle
1778 .status()
1779 .wait_for(|s| matches!(s, ActorStatus::Idle))
1780 .await
1781 .unwrap();
1782
1783 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1784 let (client, _client_handle) = client_proc.instance("client").unwrap();
1785
1786 let agent_id = proc.proc_id().actor_id(PROC_AGENT_ACTOR_NAME, 0);
1787 let port =
1788 hyperactor_reference::PortRef::<IntrospectMessage>::attest_message_port(&agent_id);
1789
1790 let query_client_proc =
1792 Proc::direct(ChannelTransport::Unix.any(), "query_client".to_string()).unwrap();
1793 let (query_client, _qc_handle) = query_client_proc.instance("qc").unwrap();
1794 let query_port = port.clone();
1795 let query_proc_id = proc.proc_id().clone();
1796 let query_count = Arc::new(AtomicUsize::new(0));
1797 let query_count_clone = query_count.clone();
1798 let query_task = tokio::spawn(async move {
1799 loop {
1800 let (reply_port, reply_rx) = query_client.open_once_port::<IntrospectResult>();
1801 if query_port
1802 .send(
1803 &query_client,
1804 IntrospectMessage::QueryChild {
1805 child_ref: hyperactor_reference::Reference::Proc(query_proc_id.clone()),
1806 reply: reply_port.bind(),
1807 },
1808 )
1809 .is_err()
1810 {
1811 break;
1812 }
1813 match tokio::time::timeout(std::time::Duration::from_secs(2), reply_rx.recv()).await
1814 {
1815 Ok(Ok(_)) => {
1816 query_count_clone.fetch_add(1, Ordering::Relaxed);
1817 }
1818 _ => {} }
1820 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1821 }
1822 });
1823
1824 const ITERATIONS: usize = 200;
1826 let mut completed = 0usize;
1827 let result = tokio::time::timeout(std::time::Duration::from_secs(30), async {
1828 for i in 0..ITERATIONS {
1829 let name = format!("churn_{}", i);
1830 let handle = proc.spawn(&name, ExtraActor).unwrap();
1831 let actor_id = handle.actor_id().clone();
1832 if let Some(mut status) = proc.stop_actor(&actor_id, "churn".to_string()) {
1833 let _ = tokio::time::timeout(
1834 std::time::Duration::from_secs(5),
1835 status.wait_for(ActorStatus::is_terminal),
1836 )
1837 .await;
1838 }
1839 completed += 1;
1840 }
1841 })
1842 .await;
1843
1844 query_task.abort();
1845 let _ = query_task.await; assert!(
1848 result.is_ok(),
1849 "spawn/stop loop stalled after {completed}/{ITERATIONS} iterations — \
1850 DashMap convoy starvation likely"
1851 );
1852 assert_eq!(
1853 completed, ITERATIONS,
1854 "expected {ITERATIONS} completed iterations, got {completed}"
1855 );
1856 assert!(
1857 query_count.load(Ordering::Relaxed) > 0,
1858 "concurrent QueryChild queries never succeeded — query task may not have run"
1859 );
1860
1861 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1863 port.send(
1864 &client,
1865 IntrospectMessage::QueryChild {
1866 child_ref: hyperactor_reference::Reference::Proc(proc.proc_id().clone()),
1867 reply: reply_port.bind(),
1868 },
1869 )
1870 .unwrap();
1871 let final_payload =
1872 tokio::time::timeout(std::time::Duration::from_secs(5), reply_rx.recv())
1873 .await
1874 .expect("final QueryChild timed out")
1875 .expect("final QueryChild channel closed");
1876 let attrs: hyperactor_config::Attrs =
1877 serde_json::from_str(&final_payload.attrs).expect("valid attrs JSON");
1878 assert_eq!(
1879 attrs.get(crate::introspect::NODE_TYPE).map(String::as_str),
1880 Some("proc"),
1881 );
1882 }
1883
1884 #[tokio::test]
1885 async fn test_stream_state_and_unsubscribe() {
1886 use hyperactor::Proc;
1887 use hyperactor::actor::ActorStatus;
1888 use hyperactor::channel::ChannelTransport;
1889
1890 use crate::resource::CreateOrUpdateClient;
1891 use crate::resource::GetStateClient;
1892 use crate::resource::StopClient;
1893 use crate::resource::StreamStateClient;
1894
1895 let proc = Proc::direct(ChannelTransport::Unix.any(), "test_proc".to_string()).unwrap();
1896 let agent_handle = ProcAgent::boot_v1(proc.clone(), None).unwrap();
1897 agent_handle
1898 .status()
1899 .wait_for(|s| matches!(s, ActorStatus::Idle))
1900 .await
1901 .unwrap();
1902
1903 let (client, _client_handle) = proc.instance("client").unwrap();
1904 let agent_ref: hyperactor_reference::ActorRef<ProcAgent> = agent_handle.bind();
1905
1906 let actor_type = hyperactor::actor::remote::Remote::collect()
1907 .name_of::<ExtraActor>()
1908 .unwrap()
1909 .to_string();
1910 let actor_params = bincode::serialize(&ExtraActor).unwrap();
1911 let actor_name = Name::Reserved("test_actor".to_string());
1912
1913 agent_ref
1915 .create_or_update(
1916 &client,
1917 actor_name.clone(),
1918 resource::Rank::new(0),
1919 ActorSpec {
1920 actor_type: actor_type.clone(),
1921 params_data: actor_params.clone(),
1922 },
1923 )
1924 .await
1925 .unwrap();
1926
1927 let (sub_port, mut sub_rx) = client.open_port::<resource::State<ActorState>>();
1929 agent_ref
1930 .stream_state(&client, actor_name.clone(), sub_port.bind())
1931 .await
1932 .unwrap();
1933
1934 let initial = sub_rx.recv().await.expect("subscriber channel error");
1936 assert_eq!(initial.status, resource::Status::Running);
1937 assert!(initial.state.is_some());
1938
1939 agent_ref
1941 .stop(&client, actor_name.clone(), "test".to_string())
1942 .await
1943 .unwrap();
1944
1945 let stopping = sub_rx.recv().await.expect("subscriber channel error");
1946 assert_eq!(stopping.status, resource::Status::Stopping);
1947
1948 let stopped = sub_rx.recv().await.expect("subscriber channel error");
1950 assert_eq!(stopped.status, resource::Status::Stopped);
1951
1952 let actor_name_2 = Name::Reserved("test_actor_2".to_string());
1954 agent_ref
1955 .create_or_update(
1956 &client,
1957 actor_name_2.clone(),
1958 resource::Rank::new(1),
1959 ActorSpec {
1960 actor_type: actor_type.clone(),
1961 params_data: actor_params.clone(),
1962 },
1963 )
1964 .await
1965 .unwrap();
1966
1967 let (sub_port_2, mut sub_rx_2) = client.open_port::<resource::State<ActorState>>();
1968 agent_ref
1969 .stream_state(&client, actor_name_2.clone(), sub_port_2.bind())
1970 .await
1971 .unwrap();
1972
1973 let initial_2 = sub_rx_2.recv().await.expect("subscriber 2 channel error");
1974 assert_eq!(initial_2.status, resource::Status::Running);
1975
1976 drop(sub_rx_2);
1978
1979 agent_ref
1983 .stop(
1984 &client,
1985 actor_name_2.clone(),
1986 "test unsubscribe".to_string(),
1987 )
1988 .await
1989 .unwrap();
1990
1991 let (sub_port_3, mut sub_rx_3) = client.open_port::<resource::State<ActorState>>();
1993 agent_ref
1994 .stream_state(&client, actor_name_2.clone(), sub_port_3.bind())
1995 .await
1996 .unwrap();
1997 loop {
1998 let state = sub_rx_3.recv().await.expect("subscriber 3 channel error");
1999 if state.status.is_terminating() {
2000 break;
2001 }
2002 }
2003
2004 let state = agent_ref
2006 .get_state(&client, actor_name_2.clone())
2007 .await
2008 .unwrap();
2009 assert!(
2010 state.status.is_terminating(),
2011 "expected terminating status, got {:?}",
2012 state.status,
2013 );
2014 }
2015}