1#![allow(unused_assignments)]
16
17use std::collections::HashMap;
18use std::time::Duration;
19
20use async_trait::async_trait;
21use hyperactor::Actor;
22use hyperactor::ActorAddr;
23use hyperactor::ActorHandle;
24use hyperactor::Addr;
25use hyperactor::Bind;
26use hyperactor::Context;
27use hyperactor::Data;
28use hyperactor::Endpoint as _;
29use hyperactor::Handler;
30use hyperactor::Instance;
31use hyperactor::PortAddr;
32use hyperactor::PortHandle;
33use hyperactor::PortRef;
34use hyperactor::RemoteEndpoint as _;
35use hyperactor::Unbind;
36use hyperactor::actor::handle_undeliverable_message;
37use hyperactor::actor::remote::Remote;
38use hyperactor::mailbox::MessageEnvelope;
39use hyperactor::mailbox::Undeliverable;
40use hyperactor::proc::Proc;
41use hyperactor::supervision::ActorSupervisionEvent;
42use hyperactor_config::CONFIG;
43use hyperactor_config::ConfigAttr;
44use hyperactor_config::Flattrs;
45use hyperactor_config::attrs::declare_attrs;
46use serde::Deserialize;
47use serde::Serialize;
48use typeuri::Named;
49
50use crate::config_dump::ConfigDump;
51use crate::config_dump::ConfigDumpResult;
52use crate::introspect::ProcessMemoryStats;
53use crate::mesh_id::ResourceId;
54use crate::pyspy::PySpyDump;
55use crate::pyspy::PySpyProfile;
56use crate::pyspy::PySpyProfileWorker;
57use crate::pyspy::PySpyWorker;
58use crate::resource;
59
60pub const PROC_AGENT_ACTOR_NAME: &str = "proc_agent";
62
63declare_attrs! {
64 @meta(CONFIG = ConfigAttr::new(
68 Some("HYPERACTOR_MESH_ORPHAN_TIMEOUT".to_string()),
69 Some("mesh_orphan_timeout".to_string()),
70 ))
71 pub attr MESH_ORPHAN_TIMEOUT: Option<Duration> = Some(Duration::from_secs(60));
72
73 @meta(CONFIG = ConfigAttr::new(
82 Some("HYPERACTOR_PROCESS_MEMORY_METRIC_INTERVAL".to_string()),
83 Some("process_memory_metric_interval".to_string()),
84 ))
85 pub attr PROCESS_MEMORY_METRIC_INTERVAL: Duration = Duration::from_secs(300);
86
87 pub(crate) attr STREAM_STATE_SUBSCRIBER: bool;
91}
92
93#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
113struct RepublishIntrospect {
114 emit_memory_metrics: bool,
115}
116wirevalue::register_type!(RepublishIntrospect);
117
118fn collect_live_children(
124 proc: &hyperactor::Proc,
125) -> (
126 Vec<hyperactor::introspect::IntrospectRef>,
127 Vec<crate::introspect::NodeRef>,
128) {
129 let all_keys = proc.all_instance_keys();
130 let mut children = Vec::with_capacity(all_keys.len());
131 let mut system_children = Vec::new();
132 for id in all_keys {
133 if let Some(cell) = proc.get_instance_by_id(&id) {
134 let actor_addr = cell.actor_addr().clone();
135 if cell.is_system() {
136 system_children.push(crate::introspect::NodeRef::Actor(actor_addr.clone()));
137 }
138 children.push(hyperactor::introspect::IntrospectRef::Actor(actor_addr));
139 }
140 }
141 (children, system_children)
142}
143
144#[derive(Debug)]
146struct ActorInstanceState {
147 create_rank: usize,
148 spawn: Result<ActorAddr, anyhow::Error>,
149 stop_initiated: bool,
153 supervision_event: Option<ActorSupervisionEvent>,
156 subscribers: Vec<PortRef<resource::State<ActorState>>>,
159 expiry_time: Option<std::time::SystemTime>,
162 generation: u64,
166 pending_wait_status: Vec<(resource::Status, PortRef<crate::StatusOverlay>)>,
170}
171
172impl ActorInstanceState {
173 fn status(&self) -> resource::Status {
176 match &self.spawn {
177 Err(e) => resource::Status::Failed(e.to_string()),
178 Ok(_) => match &self.supervision_event {
179 Some(event) if event.is_error() => resource::Status::Failed(format!("{}", event)),
180 Some(_) => resource::Status::Stopped,
181 None if self.stop_initiated => resource::Status::Stopping,
182 None => resource::Status::Running,
183 },
184 }
185 }
186
187 fn is_terminal(&self) -> bool {
190 match &self.spawn {
191 Err(_) => true,
192 Ok(_) => self.supervision_event.is_some(),
193 }
194 }
195
196 fn has_errors(&self) -> bool {
198 self.supervision_event
199 .as_ref()
200 .is_some_and(|e| e.is_error())
201 }
202
203 fn to_state(&self, id: &ResourceId) -> resource::State<ActorState> {
206 let status = self.status();
207 let actor_state = self.spawn.as_ref().ok().map(|actor_id| ActorState {
208 actor_id: actor_id.clone(),
209 create_rank: self.create_rank,
210 supervision_events: self.supervision_event.clone().into_iter().collect(),
211 });
212 resource::State {
213 id: id.clone(),
214 status,
215 state: actor_state,
216 generation: self.generation,
217 timestamp: std::time::SystemTime::now(),
218 }
219 }
220
221 fn notify_status_changed(&mut self, cx: &impl hyperactor::context::Actor, id: &ResourceId) {
226 let state = self.to_state(id);
228 for subscriber in &self.subscribers {
229 let mut headers = Flattrs::new();
230 headers.set(STREAM_STATE_SUBSCRIBER, true);
231 subscriber.post_with_headers(cx, headers, state.clone());
232 }
233
234 let status = self.status();
236 self.pending_wait_status.retain(|(min_status, reply)| {
237 if status >= *min_status {
238 let rank = self.create_rank;
239 let overlay =
240 crate::StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status.clone())])
241 .expect("valid single-run overlay");
242 let _ = reply.post(cx, overlay);
243 false
244 } else {
245 true
246 }
247 });
248 }
249}
250
251#[derive(
252 Clone,
253 Debug,
254 Default,
255 PartialEq,
256 Serialize,
257 Deserialize,
258 Named,
259 Bind,
260 Unbind
261)]
262pub(crate) struct SelfCheck {}
263
264#[hyperactor::export(
285 handlers=[
286 ActorSupervisionEvent,
287 resource::CreateOrUpdate<ActorSpec> { cast = true },
288 resource::Stop { cast = true },
289 resource::StopAll { cast = true },
290 resource::GetState<ActorState> { cast = true },
291 resource::StreamState<ActorState> { cast = true },
292 resource::KeepaliveGetState<ActorState> { cast = true },
293 resource::GetRankStatus { cast = true },
294 resource::WaitRankStatus { cast = true },
295 RepublishIntrospect { cast = true },
296 PySpyDump,
297 PySpyProfile,
298 ConfigDump,
299 ]
300)]
301pub struct ProcAgent {
302 proc: Proc,
303 remote: Remote,
304 actor_states: HashMap<ResourceId, ActorInstanceState>,
306 record_supervision_events: bool,
309 introspect_dirty: bool,
312 shutdown_tx: Option<tokio::sync::oneshot::Sender<i32>>,
316 stopping_all: bool,
320 mesh_orphan_timeout: Option<Duration>,
322}
323
324impl ProcAgent {
325 pub(crate) fn boot_v1(
326 proc: Proc,
327 shutdown_tx: Option<tokio::sync::oneshot::Sender<i32>>,
328 ) -> Result<ActorHandle<Self>, anyhow::Error> {
329 let orphan_timeout = hyperactor_config::global::get(MESH_ORPHAN_TIMEOUT);
330 let agent = ProcAgent {
331 proc: proc.clone(),
332 remote: Remote::collect(),
333 actor_states: HashMap::new(),
334 record_supervision_events: true,
335 introspect_dirty: false,
336 shutdown_tx,
337 stopping_all: false,
338 mesh_orphan_timeout: orphan_timeout,
339 };
340 proc.spawn::<Self>(PROC_AGENT_ACTOR_NAME, agent)
341 }
342
343 fn all_actors_terminal(&self) -> bool {
347 self.actor_states.values().all(|state| state.is_terminal())
348 }
349
350 async fn shutdown(&mut self) {
354 let has_errors = self.actor_states.values().any(|state| state.has_errors());
355 let exit_code = if has_errors { 1 } else { 0 };
356
357 let flush_timeout =
358 hyperactor_config::global::get(hyperactor::config::FORWARDER_FLUSH_TIMEOUT);
359 match tokio::time::timeout(flush_timeout, self.proc.flush()).await {
360 Ok(Err(err)) => {
361 tracing::warn!("forwarder flush failed during shutdown: {}", err);
362 }
363 Err(_elapsed) => {
364 tracing::warn!("forwarder flush timed out during shutdown");
365 }
366 Ok(Ok(())) => {}
367 }
368
369 self.proc.join_mailbox_server().await;
373
374 tracing::info!(
375 "shutting down process after all actors reached terminal state (exit_code={})",
376 exit_code,
377 );
378
379 if let Some(tx) = self.shutdown_tx.take() {
380 let _ = tx.send(exit_code);
381 return;
382 }
383 std::process::exit(exit_code);
384 }
385
386 fn stop_actor_by_id(&self, actor_id: &ActorAddr, reason: &str) {
389 tracing::info!(
390 name = "StopActor",
391 %actor_id,
392 actor_name = actor_id.log_name(),
393 %reason,
394 );
395 self.proc.stop_actor(actor_id.id(), reason.to_string());
396 }
397
398 fn publish_introspect_properties(
401 &self,
402 cx: &impl hyperactor::context::Actor,
403 ) -> ProcessMemoryStats {
404 let (mut children, mut system_children) = collect_live_children(&self.proc);
405
406 let mut stopped_children: Vec<crate::introspect::NodeRef> = Vec::new();
410 for id in self.proc.all_terminated_actor_ids() {
411 let child_ref = hyperactor::introspect::IntrospectRef::Actor(id.clone());
412 let node_ref = crate::introspect::NodeRef::Actor(id.clone());
413 stopped_children.push(node_ref.clone());
414 if let Some(snapshot) = self.proc.terminated_snapshot(&id) {
415 let snapshot_attrs: hyperactor_config::Attrs =
416 serde_json::from_str(&snapshot.attrs).unwrap_or_default();
417 if snapshot_attrs
418 .get(hyperactor::introspect::IS_SYSTEM)
419 .copied()
420 .unwrap_or(false)
421 {
422 system_children.push(node_ref);
423 }
424 }
425 if !children.contains(&child_ref) {
426 children.push(child_ref);
427 }
428 }
429
430 let stopped_retention_cap =
431 hyperactor_config::global::get(hyperactor::config::TERMINATED_SNAPSHOT_RETENTION);
432
433 let failed_actor_count = self
435 .actor_states
436 .values()
437 .filter(|s| s.has_errors())
438 .count();
439
440 let num_live = children.len();
442 let mut attrs = hyperactor_config::Attrs::new();
443 attrs.set(crate::introspect::NODE_TYPE, "proc".to_string());
444 attrs.set(
445 crate::introspect::PROC_NAME,
446 self.proc
447 .proc_addr()
448 .label()
449 .map(|l| l.as_str().to_string())
450 .unwrap_or_else(|| self.proc.proc_addr().id().to_string()),
451 );
452 attrs.set(crate::introspect::NUM_ACTORS, num_live);
453 attrs.set(hyperactor::introspect::CHILDREN, children);
454 attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children);
455 attrs.set(crate::introspect::STOPPED_CHILDREN, stopped_children);
456 attrs.set(
457 crate::introspect::STOPPED_RETENTION_CAP,
458 stopped_retention_cap,
459 );
460 attrs.set(crate::introspect::IS_POISONED, failed_actor_count > 0);
461 attrs.set(crate::introspect::FAILED_ACTOR_COUNT, failed_actor_count);
462
463 let memory = crate::introspect::ProcessMemoryStats::read_from_procfs();
468 memory.to_attrs(&mut attrs);
469
470 let queue_total = self.proc.queue_depth_total();
472 attrs.set(crate::introspect::ACTOR_WORK_QUEUE_DEPTH_TOTAL, queue_total);
473
474 let mut queue_max: u64 = 0;
476 for actor_id in self.proc.all_instance_keys() {
477 if let Some(cell) = self.proc.get_instance_by_id(&actor_id) {
478 queue_max = queue_max.max(cell.queue_depth());
479 }
480 }
481 attrs.set(crate::introspect::ACTOR_WORK_QUEUE_DEPTH_MAX, queue_max);
482
483 attrs.set(
485 crate::introspect::ACTOR_WORK_QUEUE_DEPTH_HIGH_WATER_MARK,
486 self.proc.queue_depth_high_water_mark(),
487 );
488 attrs.set(
489 crate::introspect::LAST_NONZERO_QUEUE_DEPTH_AGE_MS,
490 self.proc.last_nonzero_queue_depth_age_ms(),
491 );
492
493 cx.instance().publish_attrs(attrs);
494
495 memory
496 }
497}
498
499#[async_trait]
500impl Actor for ProcAgent {
501 async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
502 this.set_system();
503 self.proc.set_supervision_coordinator(this.port())?;
504 let _ = self.publish_introspect_properties(this);
505
506 let proc = self.proc.clone();
509 let self_id = this.self_addr().clone();
510 this.set_query_child_handler(move |child_ref| {
511 use hyperactor::introspect::IntrospectResult;
512
513 if let Addr::Actor(actor_ref) = child_ref
514 && let Some(snapshot) = proc.terminated_snapshot(actor_ref)
515 {
516 return snapshot;
517 }
518
519 if let Addr::Proc(proc_ref) = child_ref
527 && *proc_ref == proc.proc_addr()
528 {
529 let (mut children, mut system_children) = collect_live_children(&proc);
530
531 let mut stopped_children: Vec<crate::introspect::NodeRef> = Vec::new();
532 for id in proc.all_terminated_actor_ids() {
533 let child_ref = hyperactor::introspect::IntrospectRef::Actor(id.clone());
534 let node_ref = crate::introspect::NodeRef::Actor(id.clone());
535 stopped_children.push(node_ref.clone());
536 if let Some(snapshot) = proc.terminated_snapshot(&id) {
537 let snapshot_attrs: hyperactor_config::Attrs =
538 serde_json::from_str(&snapshot.attrs).unwrap_or_default();
539 if snapshot_attrs
540 .get(hyperactor::introspect::IS_SYSTEM)
541 .copied()
542 .unwrap_or(false)
543 {
544 system_children.push(node_ref);
545 }
546 }
547 if !children.contains(&child_ref) {
548 children.push(child_ref);
549 }
550 }
551
552 let stopped_retention_cap = hyperactor_config::global::get(
553 hyperactor::config::TERMINATED_SNAPSHOT_RETENTION,
554 );
555
556 let (is_poisoned, failed_actor_count) = proc
557 .get_instance(&self_id)
558 .and_then(|cell| cell.published_attrs())
559 .map(|attrs| {
560 let is_poisoned = attrs
561 .get(crate::introspect::IS_POISONED)
562 .copied()
563 .unwrap_or(false);
564 let failed_actor_count = attrs
565 .get(crate::introspect::FAILED_ACTOR_COUNT)
566 .copied()
567 .unwrap_or(0);
568 (is_poisoned, failed_actor_count)
569 })
570 .unwrap_or((false, 0));
571
572 let num_live = children.len();
574 let mut attrs = hyperactor_config::Attrs::new();
575 attrs.set(crate::introspect::NODE_TYPE, "proc".to_string());
576 attrs.set(
577 crate::introspect::PROC_NAME,
578 proc_ref
579 .label()
580 .map(|l| l.as_str().to_string())
581 .unwrap_or_else(|| proc_ref.id().to_string()),
582 );
583 attrs.set(crate::introspect::NUM_ACTORS, num_live);
584 attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children);
585 attrs.set(crate::introspect::STOPPED_CHILDREN, stopped_children);
586 attrs.set(
587 crate::introspect::STOPPED_RETENTION_CAP,
588 stopped_retention_cap,
589 );
590 attrs.set(crate::introspect::IS_POISONED, is_poisoned);
591 attrs.set(crate::introspect::FAILED_ACTOR_COUNT, failed_actor_count);
592
593 let memory = crate::introspect::ProcessMemoryStats::read_from_procfs();
596 memory.to_attrs(&mut attrs);
597 attrs.set(
598 crate::introspect::ACTOR_WORK_QUEUE_DEPTH_TOTAL,
599 proc.queue_depth_total(),
600 );
601 let mut queue_max: u64 = 0;
602 for aid in proc.all_instance_keys() {
603 if let Some(cell) = proc.get_instance_by_id(&aid) {
604 queue_max = queue_max.max(cell.queue_depth());
605 }
606 }
607 attrs.set(crate::introspect::ACTOR_WORK_QUEUE_DEPTH_MAX, queue_max);
608 attrs.set(
609 crate::introspect::ACTOR_WORK_QUEUE_DEPTH_HIGH_WATER_MARK,
610 proc.queue_depth_high_water_mark(),
611 );
612 attrs.set(
613 crate::introspect::LAST_NONZERO_QUEUE_DEPTH_AGE_MS,
614 proc.last_nonzero_queue_depth_age_ms(),
615 );
616
617 let attrs_json = serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string());
618
619 return IntrospectResult {
620 identity: hyperactor::introspect::IntrospectRef::Proc(proc_ref.clone()),
621 attrs: attrs_json,
622 children,
623 parent: None,
624 as_of: std::time::SystemTime::now(),
625 };
626 }
627
628 {
629 let mut error_attrs = hyperactor_config::Attrs::new();
630 error_attrs.set(hyperactor::introspect::ERROR_CODE, "not_found".to_string());
631 error_attrs.set(
632 hyperactor::introspect::ERROR_MESSAGE,
633 format!("child {} not found", child_ref),
634 );
635 let identity = match child_ref {
636 Addr::Proc(p) => hyperactor::introspect::IntrospectRef::Proc(p.clone()),
637 Addr::Actor(a) => hyperactor::introspect::IntrospectRef::Actor(a.clone()),
638 Addr::Port(p) => hyperactor::introspect::IntrospectRef::Actor(p.actor_addr()),
639 };
640 IntrospectResult {
641 identity,
642 attrs: serde_json::to_string(&error_attrs).unwrap_or_else(|_| "{}".to_string()),
643 children: Vec::new(),
644 parent: None,
645 as_of: std::time::SystemTime::now(),
646 }
647 }
648 });
649
650 if let Some(delay) = &self.mesh_orphan_timeout {
651 this.post_after(this, SelfCheck::default(), *delay);
652 }
653 if cfg!(target_os = "linux") {
654 let interval = hyperactor_config::global::get(PROCESS_MEMORY_METRIC_INTERVAL);
655 if !interval.is_zero() {
656 this.post_after(
657 this,
658 RepublishIntrospect {
659 emit_memory_metrics: true,
660 },
661 interval,
662 );
663 }
664 }
665 Ok(())
666 }
667
668 async fn handle_undeliverable_message(
669 &mut self,
670 cx: &Instance<Self>,
671 envelope: Undeliverable<MessageEnvelope>,
672 ) -> Result<(), anyhow::Error> {
673 let Some(returned) = envelope.as_message() else {
674 return handle_undeliverable_message(cx, envelope);
675 };
676 if let Some(true) = returned.headers().get(STREAM_STATE_SUBSCRIBER) {
677 let dest_port_id: PortAddr = returned.dest().clone();
678 let port = PortRef::<resource::State<ActorState>>::attest(dest_port_id);
679 for instance in self.actor_states.values_mut() {
681 instance.subscribers.retain(|s| s != &port);
682 }
683 Ok(())
684 } else {
685 handle_undeliverable_message(cx, envelope)
686 }
687 }
688}
689
690#[async_trait]
691impl Handler<ActorSupervisionEvent> for ProcAgent {
692 async fn handle(
693 &mut self,
694 cx: &Context<Self>,
695 event: ActorSupervisionEvent,
696 ) -> anyhow::Result<()> {
697 if self.record_supervision_events {
698 if event.is_error() {
699 tracing::warn!(
700 name = "SupervisionEvent",
701 proc_id = %self.proc.proc_addr(),
702 %event,
703 "recording supervision error",
704 );
705 } else {
706 tracing::debug!(
707 name = "SupervisionEvent",
708 proc_id = %self.proc.proc_addr(),
709 %event,
710 "recording non-error supervision event",
711 );
712 }
713 if let Some((id, instance)) = self.actor_states.iter_mut().find(|(_, s)| {
715 s.spawn
716 .as_ref()
717 .ok()
718 .is_some_and(|actor_id| actor_id.id() == event.actor_id.id())
719 }) {
720 instance.supervision_event = Some(event.clone());
721 instance.generation += 1;
722 let id = id.clone();
723 instance.notify_status_changed(cx, &id);
724 }
725 if !self.introspect_dirty {
729 self.introspect_dirty = true;
730 cx.post_after(
731 cx,
732 RepublishIntrospect {
733 emit_memory_metrics: false,
734 },
735 std::time::Duration::from_millis(100),
736 );
737 }
738
739 if self.stopping_all && self.all_actors_terminal() {
742 self.shutdown().await;
743 }
744 }
745 if !self.record_supervision_events && event.is_error() {
746 tracing::error!(
749 name = "supervision_event_transmit_failed",
750 proc_id = %cx.self_addr().proc_addr(),
751 %event,
752 "could not propagate supervision event, crashing",
753 );
754
755 std::process::exit(1);
758 }
759 Ok(())
760 }
761}
762
763#[async_trait]
764impl Handler<RepublishIntrospect> for ProcAgent {
765 async fn handle(&mut self, cx: &Context<Self>, msg: RepublishIntrospect) -> anyhow::Result<()> {
766 self.introspect_dirty = false;
767 let memory = self.publish_introspect_properties(cx);
768 if msg.emit_memory_metrics {
769 let proc_id = self.proc.proc_addr().to_string();
770 let pid = std::process::id() as i64;
771 if let Some(rss) = memory.process_rss_bytes {
772 crate::metrics::PROCESS_RSS_BYTES.record(
773 rss as f64,
774 hyperactor_telemetry::kv_pairs!(
775 "proc_id" => proc_id.clone(),
776 "pid" => pid,
777 ),
778 );
779 }
780 if let Some(vm) = memory.process_vm_size_bytes {
781 crate::metrics::PROCESS_VM_SIZE_BYTES.record(
782 vm as f64,
783 hyperactor_telemetry::kv_pairs!(
784 "proc_id" => proc_id,
785 "pid" => pid,
786 ),
787 );
788 }
789 let interval = hyperactor_config::global::get(PROCESS_MEMORY_METRIC_INTERVAL);
790 if !interval.is_zero() {
791 cx.post_after(
792 cx,
793 RepublishIntrospect {
794 emit_memory_metrics: true,
795 },
796 interval,
797 );
798 }
799 }
800 Ok(())
801 }
802}
803
804#[async_trait]
805impl Handler<PySpyDump> for ProcAgent {
806 async fn handle(
807 &mut self,
808 cx: &Context<Self>,
809 message: PySpyDump,
810 ) -> Result<(), anyhow::Error> {
811 PySpyWorker::spawn_and_forward(cx, message.opts, message.result)
812 }
813}
814
815#[async_trait]
816impl Handler<PySpyProfile> for ProcAgent {
817 async fn handle(
818 &mut self,
819 cx: &Context<Self>,
820 message: PySpyProfile,
821 ) -> Result<(), anyhow::Error> {
822 PySpyProfileWorker::spawn_and_forward(cx, message.request, message.result)
823 }
824}
825
826#[async_trait]
827impl Handler<ConfigDump> for ProcAgent {
828 async fn handle(
829 &mut self,
830 cx: &Context<Self>,
831 message: ConfigDump,
832 ) -> Result<(), anyhow::Error> {
833 let entries = hyperactor_config::global::config_entries();
834 let _ = message.result.post(cx, ConfigDumpResult { entries });
837 Ok(())
838 }
839}
840
841#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
845pub struct ActorSpec {
846 pub actor_type: String,
848 pub params_data: Data,
850}
851wirevalue::register_type!(ActorSpec);
852
853#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
855pub struct ActorState {
856 pub actor_id: ActorAddr,
858 pub create_rank: usize,
860 pub supervision_events: Vec<ActorSupervisionEvent>,
862}
863wirevalue::register_type!(ActorState);
864
865#[async_trait]
866impl Handler<resource::CreateOrUpdate<ActorSpec>> for ProcAgent {
867 async fn handle(
868 &mut self,
869 cx: &Context<Self>,
870 create_or_update: resource::CreateOrUpdate<ActorSpec>,
871 ) -> anyhow::Result<()> {
872 if self.actor_states.contains_key(&create_or_update.id) {
873 return Ok(());
875 }
876 let create_rank = create_or_update.rank.unwrap();
877 if self.actor_states.values().any(|s| s.has_errors()) {
881 self.actor_states.insert(
882 create_or_update.id.clone(),
883 ActorInstanceState {
884 spawn: Err(anyhow::anyhow!(
885 "Cannot spawn new actors on mesh with supervision events"
886 )),
887 create_rank,
888 stop_initiated: false,
889 supervision_event: None,
890 subscribers: Vec::new(),
891 expiry_time: None,
892 generation: 1,
893 pending_wait_status: Vec::new(),
894 },
895 );
896 return Ok(());
897 }
898
899 let ActorSpec {
900 actor_type,
901 params_data,
902 } = create_or_update.spec;
903 self.actor_states.insert(
904 create_or_update.id.clone(),
905 ActorInstanceState {
906 create_rank,
907 spawn: self
908 .remote
909 .gspawn(
910 &self.proc,
911 &actor_type,
912 create_or_update.id.uid().clone(),
913 params_data,
914 cx.headers().clone(),
915 )
916 .await,
917 stop_initiated: false,
918 supervision_event: None,
919 subscribers: Vec::new(),
920 expiry_time: None,
921 generation: 1,
922 pending_wait_status: Vec::new(),
923 },
924 );
925
926 let _ = self.publish_introspect_properties(cx);
927 Ok(())
928 }
929}
930
931#[async_trait]
932impl Handler<resource::Stop> for ProcAgent {
933 async fn handle(&mut self, cx: &Context<Self>, message: resource::Stop) -> anyhow::Result<()> {
934 let actor_id = match self.actor_states.get_mut(&message.id) {
935 Some(actor_state) => {
936 let id = actor_state.spawn.as_ref().ok().cloned();
937 if id.is_some() && !actor_state.stop_initiated {
938 actor_state.stop_initiated = true;
939 actor_state.generation += 1;
940 actor_state.notify_status_changed(cx, &message.id);
941 id
942 } else {
943 None
944 }
945 }
946 None => None,
947 };
948 if let Some(actor_id) = actor_id {
949 self.stop_actor_by_id(&actor_id, &message.reason);
950 }
951
952 Ok(())
953 }
954}
955
956#[async_trait]
960impl Handler<resource::StopAll> for ProcAgent {
961 async fn handle(
962 &mut self,
963 _cx: &Context<Self>,
964 message: resource::StopAll,
965 ) -> anyhow::Result<()> {
966 self.stopping_all = true;
967
968 let to_stop: Vec<ActorAddr> = self
970 .actor_states
971 .values_mut()
972 .filter_map(|state| {
973 if state.stop_initiated {
974 return None;
975 }
976 state.stop_initiated = true;
977 state.spawn.as_ref().ok().cloned()
978 })
979 .collect();
980
981 for actor_id in &to_stop {
982 self.stop_actor_by_id(actor_id, &message.reason);
983 }
984
985 if self.all_actors_terminal() {
987 self.shutdown().await;
988 }
989
990 Ok(())
991 }
992}
993
994#[async_trait]
995impl Handler<resource::GetRankStatus> for ProcAgent {
996 async fn handle(
997 &mut self,
998 cx: &Context<Self>,
999 get_rank_status: resource::GetRankStatus,
1000 ) -> anyhow::Result<()> {
1001 use crate::StatusOverlay;
1002 use crate::resource::Status;
1003
1004 let (rank, status) = match self.actor_states.get(&get_rank_status.id) {
1005 Some(state) => (state.create_rank, state.status()),
1006 None => (usize::MAX, Status::NotExist),
1007 };
1008
1009 let overlay = if rank == usize::MAX {
1012 StatusOverlay::new()
1013 } else {
1014 StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status)])
1015 .expect("valid single-run overlay")
1016 };
1017 get_rank_status.reply.post(cx, overlay);
1018 Ok(())
1019 }
1020}
1021
1022#[async_trait]
1023impl Handler<resource::WaitRankStatus> for ProcAgent {
1024 async fn handle(
1025 &mut self,
1026 cx: &Context<Self>,
1027 msg: resource::WaitRankStatus,
1028 ) -> anyhow::Result<()> {
1029 use crate::StatusOverlay;
1030 use crate::resource::Status;
1031
1032 let (rank, status) = match self.actor_states.get(&msg.id) {
1033 Some(state) => (state.create_rank, state.status()),
1034 None => (usize::MAX, Status::NotExist),
1035 };
1036
1037 if status >= msg.min_status || rank == usize::MAX {
1039 let overlay = if rank == usize::MAX {
1040 StatusOverlay::new()
1041 } else {
1042 StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status)])
1043 .expect("valid single-run overlay")
1044 };
1045 let _ = msg.reply.post(cx, overlay);
1046 return Ok(());
1047 }
1048
1049 if let Some(state) = self.actor_states.get_mut(&msg.id) {
1052 state.pending_wait_status.push((msg.min_status, msg.reply));
1053 }
1054 Ok(())
1055 }
1056}
1057
1058#[async_trait]
1059impl Handler<resource::GetState<ActorState>> for ProcAgent {
1060 async fn handle(
1061 &mut self,
1062 cx: &Context<Self>,
1063 get_state: resource::GetState<ActorState>,
1064 ) -> anyhow::Result<()> {
1065 let state = match self.actor_states.get(&get_state.id) {
1066 Some(instance) => instance.to_state(&get_state.id),
1067 None => resource::State {
1068 id: get_state.id.clone(),
1069 status: resource::Status::NotExist,
1070 state: None,
1071 generation: 0,
1072 timestamp: std::time::SystemTime::now(),
1073 },
1074 };
1075
1076 get_state.reply.post(cx, state);
1077 Ok(())
1078 }
1079}
1080
1081#[async_trait]
1082impl Handler<resource::StreamState<ActorState>> for ProcAgent {
1083 async fn handle(
1084 &mut self,
1085 cx: &Context<Self>,
1086 stream_state: resource::StreamState<ActorState>,
1087 ) -> anyhow::Result<()> {
1088 let state = match self.actor_states.get_mut(&stream_state.id) {
1089 Some(instance) => {
1090 let state = instance.to_state(&stream_state.id);
1091 instance.subscribers.push(stream_state.subscriber.clone());
1092 state
1093 }
1094 None => resource::State {
1095 id: stream_state.id.clone(),
1096 status: resource::Status::NotExist,
1097 state: None,
1098 generation: 0,
1099 timestamp: std::time::SystemTime::now(),
1100 },
1101 };
1102
1103 let mut headers = Flattrs::new();
1105 headers.set(STREAM_STATE_SUBSCRIBER, true);
1106 stream_state
1107 .subscriber
1108 .post_with_headers(cx, headers, state);
1109 Ok(())
1110 }
1111}
1112
1113#[async_trait]
1114impl Handler<resource::KeepaliveGetState<ActorState>> for ProcAgent {
1115 async fn handle(
1116 &mut self,
1117 cx: &Context<Self>,
1118 message: resource::KeepaliveGetState<ActorState>,
1119 ) -> anyhow::Result<()> {
1120 if let Ok(instance_state) =
1122 self.actor_states
1123 .get_mut(&message.get_state.id)
1124 .ok_or_else(|| {
1125 anyhow::anyhow!(
1126 "attempting to register a keepalive for an actor that doesn't exist: {}",
1127 message.get_state.id
1128 )
1129 })
1130 {
1131 instance_state.expiry_time = Some(message.expires_after);
1132 }
1133
1134 <Self as Handler<resource::GetState<ActorState>>>::handle(self, cx, message.get_state).await
1136 }
1137}
1138
1139#[derive(Debug, hyperactor::Handler, hyperactor::HandleClient)]
1142pub struct NewClientInstance {
1143 #[reply]
1144 pub client_instance: PortHandle<Instance<()>>,
1145}
1146
1147#[async_trait]
1148impl Handler<NewClientInstance> for ProcAgent {
1149 async fn handle(
1150 &mut self,
1151 cx: &Context<Self>,
1152 NewClientInstance { client_instance }: NewClientInstance,
1153 ) -> anyhow::Result<()> {
1154 let (instance, _handle) = self.proc.client("client")?;
1155 client_instance.post(cx, instance);
1156 Ok(())
1157 }
1158}
1159
1160#[derive(Debug, hyperactor::Handler, hyperactor::HandleClient)]
1163pub struct GetProc {
1164 #[reply]
1165 pub proc: PortHandle<Proc>,
1166}
1167
1168#[async_trait]
1169impl Handler<GetProc> for ProcAgent {
1170 async fn handle(
1171 &mut self,
1172 cx: &Context<Self>,
1173 GetProc { proc }: GetProc,
1174 ) -> anyhow::Result<()> {
1175 proc.post(cx, self.proc.clone());
1176 Ok(())
1177 }
1178}
1179
1180#[async_trait]
1181impl Handler<SelfCheck> for ProcAgent {
1182 async fn handle(&mut self, cx: &Context<Self>, _: SelfCheck) -> anyhow::Result<()> {
1183 let Some(duration) = &self.mesh_orphan_timeout else {
1189 return Ok(());
1190 };
1191 let duration = *duration;
1192 let now = std::time::SystemTime::now();
1193
1194 let expired: Vec<(ResourceId, ActorAddr)> = self
1196 .actor_states
1197 .iter()
1198 .filter_map(|(id, state)| {
1199 let expiry = state.expiry_time?;
1200 if now > expiry
1202 && !state.stop_initiated
1203 && let Ok(actor_id) = &state.spawn
1204 {
1205 return Some((id.clone(), actor_id.clone()));
1206 }
1207 None
1208 })
1209 .collect();
1210
1211 if !expired.is_empty() {
1212 tracing::info!(
1213 "stopping {} orphaned actors past their keepalive expiry",
1214 expired.len(),
1215 );
1216 }
1217
1218 for (id, actor_id) in expired {
1219 if let Some(state) = self.actor_states.get_mut(&id) {
1220 state.stop_initiated = true;
1221 }
1222 self.stop_actor_by_id(&actor_id, "orphaned");
1223 }
1224
1225 cx.post_after(cx, SelfCheck::default(), duration);
1227 Ok(())
1228 }
1229}
1230
1231#[cfg(test)]
1232mod tests {
1233 use std::sync::Arc;
1234
1235 use hyperactor::ActorRef;
1236
1237 use super::*;
1238
1239 #[derive(Debug, Default, Serialize, Deserialize)]
1241 #[hyperactor::export(handlers = [])]
1242 struct ExtraActor;
1243 impl hyperactor::Actor for ExtraActor {}
1244 hyperactor::register_spawnable!(ExtraActor);
1245 #[tokio::test]
1259 async fn test_query_child_proc_returns_live_children() {
1260 use hyperactor::Proc;
1261 use hyperactor::actor::ActorStatus;
1262 use hyperactor::channel::ChannelTransport;
1263 use hyperactor::introspect::IntrospectMessage;
1264 use hyperactor::introspect::IntrospectResult;
1265
1266 let proc = Proc::direct(ChannelTransport::Unix.any(), "test_proc".to_string()).unwrap();
1267 let agent_handle = ProcAgent::boot_v1(proc.clone(), None).unwrap();
1268
1269 agent_handle
1271 .status()
1272 .wait_for(|s| matches!(s, ActorStatus::Idle))
1273 .await
1274 .unwrap();
1275
1276 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1278 let (client, _client_handle) = client_proc.client("client").unwrap();
1279
1280 let agent_id: ActorAddr = proc.proc_addr().actor_addr(PROC_AGENT_ACTOR_NAME);
1281 let port = PortRef::<IntrospectMessage>::attest_handler_port(&agent_id);
1282
1283 let query = |client: &hyperactor::Instance<()>| {
1286 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1287 port.post(
1288 client,
1289 IntrospectMessage::QueryChild {
1290 child_ref: Addr::Proc(proc.proc_addr().clone()),
1291 reply: reply_port.bind(),
1292 },
1293 );
1294 reply_rx
1295 };
1296 let recv = |rx: hyperactor::mailbox::OncePortReceiver<IntrospectResult>| async move {
1297 tokio::time::timeout(std::time::Duration::from_secs(5), rx.recv())
1298 .await
1299 .expect("QueryChild(Proc) timed out — reply never delivered")
1300 .expect("reply channel closed")
1301 };
1302
1303 let payload = recv(query(&client)).await;
1305 let attrs: hyperactor_config::Attrs =
1307 serde_json::from_str(&payload.attrs).expect("valid attrs JSON");
1308 assert_eq!(
1309 attrs.get(crate::introspect::NODE_TYPE).map(String::as_str),
1310 Some("proc"),
1311 "expected node_type=proc in attrs, got {:?}",
1312 payload.attrs
1313 );
1314 assert!(
1315 payload
1316 .children
1317 .iter()
1318 .any(|c| c.to_string().contains(PROC_AGENT_ACTOR_NAME)),
1319 "initial children {:?} should contain proc_agent",
1320 payload.children
1321 );
1322 let initial_count = payload.children.len();
1323
1324 proc.spawn("extra_actor", ExtraActor).unwrap();
1328
1329 let payload2 = recv(query(&client)).await;
1331 let attrs2: hyperactor_config::Attrs =
1332 serde_json::from_str(&payload2.attrs).expect("valid attrs JSON");
1333 assert_eq!(
1334 attrs2.get(crate::introspect::NODE_TYPE).map(String::as_str),
1335 Some("proc"),
1336 "expected node_type=proc in attrs, got {:?}",
1337 payload2.attrs
1338 );
1339 assert!(
1340 payload2
1341 .children
1342 .iter()
1343 .any(|c| c.to_string().contains("extra_actor")),
1344 "after direct spawn, children {:?} should contain extra_actor",
1345 payload2.children
1346 );
1347 assert!(
1348 payload2.children.len() > initial_count,
1349 "expected at least {} children after direct spawn, got {:?}",
1350 initial_count + 1,
1351 payload2.children
1352 );
1353 }
1354
1355 #[tokio::test]
1362 async fn test_rapid_spawn_stop_does_not_stall_proc_agent() {
1363 use std::sync::Arc;
1364 use std::sync::atomic::AtomicUsize;
1365 use std::sync::atomic::Ordering;
1366
1367 use hyperactor::Proc;
1368 use hyperactor::actor::ActorStatus;
1369 use hyperactor::channel::ChannelTransport;
1370 use hyperactor::introspect::IntrospectMessage;
1371 use hyperactor::introspect::IntrospectResult;
1372
1373 let proc = Proc::direct(ChannelTransport::Unix.any(), "test_proc".to_string()).unwrap();
1374 let agent_handle = ProcAgent::boot_v1(proc.clone(), None).unwrap();
1375
1376 agent_handle
1377 .status()
1378 .wait_for(|s| matches!(s, ActorStatus::Idle))
1379 .await
1380 .unwrap();
1381
1382 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1383 let (client, _client_handle) = client_proc.client("client").unwrap();
1384
1385 let agent_id: ActorAddr = proc.proc_addr().actor_addr(PROC_AGENT_ACTOR_NAME);
1386 let port = PortRef::<IntrospectMessage>::attest_handler_port(&agent_id);
1387
1388 let query_client_proc =
1390 Proc::direct(ChannelTransport::Unix.any(), "query_client".to_string()).unwrap();
1391 let (query_client, _qc_handle) = query_client_proc.client("qc").unwrap();
1392 let query_port = port.clone();
1393 let query_proc_id = proc.proc_addr().clone();
1394 let query_count = Arc::new(AtomicUsize::new(0));
1395 let query_count_clone = query_count.clone();
1396 let query_task = tokio::spawn(async move {
1397 loop {
1398 let (reply_port, reply_rx) = query_client.open_once_port::<IntrospectResult>();
1399 query_port.post(
1400 &query_client,
1401 IntrospectMessage::QueryChild {
1402 child_ref: Addr::Proc(query_proc_id.clone()),
1403 reply: reply_port.bind(),
1404 },
1405 );
1406 match tokio::time::timeout(std::time::Duration::from_secs(2), reply_rx.recv()).await
1407 {
1408 Ok(Ok(_)) => {
1409 query_count_clone.fetch_add(1, Ordering::Relaxed);
1410 }
1411 _ => {} }
1413 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1414 }
1415 });
1416
1417 const ITERATIONS: usize = 200;
1419 let mut completed = 0usize;
1420 let result = tokio::time::timeout(std::time::Duration::from_secs(30), async {
1421 for i in 0..ITERATIONS {
1422 let name = format!("churn_{}", i);
1423 let handle = proc.spawn(&name, ExtraActor).unwrap();
1424 let actor_id = handle.actor_addr().clone();
1425 if let Some(mut status) = proc.stop_actor(actor_id.id(), "churn".to_string()) {
1426 let _ = tokio::time::timeout(
1427 std::time::Duration::from_secs(5),
1428 status.wait_for(ActorStatus::is_terminal),
1429 )
1430 .await;
1431 }
1432 completed += 1;
1433 }
1434 })
1435 .await;
1436
1437 query_task.abort();
1438 let _ = query_task.await; assert!(
1441 result.is_ok(),
1442 "spawn/stop loop stalled after {completed}/{ITERATIONS} iterations — \
1443 DashMap convoy starvation likely"
1444 );
1445 assert_eq!(
1446 completed, ITERATIONS,
1447 "expected {ITERATIONS} completed iterations, got {completed}"
1448 );
1449 assert!(
1450 query_count.load(Ordering::Relaxed) > 0,
1451 "concurrent QueryChild queries never succeeded — query task may not have run"
1452 );
1453
1454 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1456 port.post(
1457 &client,
1458 IntrospectMessage::QueryChild {
1459 child_ref: Addr::Proc(proc.proc_addr().clone()),
1460 reply: reply_port.bind(),
1461 },
1462 );
1463 let final_payload =
1464 tokio::time::timeout(std::time::Duration::from_secs(5), reply_rx.recv())
1465 .await
1466 .expect("final QueryChild timed out")
1467 .expect("final QueryChild channel closed");
1468 let attrs: hyperactor_config::Attrs =
1469 serde_json::from_str(&final_payload.attrs).expect("valid attrs JSON");
1470 assert_eq!(
1471 attrs.get(crate::introspect::NODE_TYPE).map(String::as_str),
1472 Some("proc"),
1473 );
1474 }
1475
1476 #[tokio::test]
1477 async fn test_stream_state_and_unsubscribe() {
1478 use hyperactor::Proc;
1479 use hyperactor::actor::ActorStatus;
1480 use hyperactor::channel::ChannelTransport;
1481
1482 use crate::resource::CreateOrUpdateClient;
1483 use crate::resource::GetStateClient;
1484 use crate::resource::StopClient;
1485 use crate::resource::StreamStateClient;
1486
1487 let proc = Proc::direct(ChannelTransport::Unix.any(), "test_proc".to_string()).unwrap();
1488 let agent_handle = ProcAgent::boot_v1(proc.clone(), None).unwrap();
1489 agent_handle
1490 .status()
1491 .wait_for(|s| matches!(s, ActorStatus::Idle))
1492 .await
1493 .unwrap();
1494
1495 let (client, _client_handle) = proc.client("client").unwrap();
1496 let agent_ref: ActorRef<ProcAgent> = agent_handle.bind();
1497
1498 let actor_type = hyperactor::actor::remote::Remote::collect()
1499 .name_of::<ExtraActor>()
1500 .unwrap()
1501 .to_string();
1502 let actor_params =
1503 bincode::serde::encode_to_vec(&ExtraActor, bincode::config::legacy()).unwrap();
1504 let actor_name = ResourceId::singleton(hyperactor::id::Label::new("test-actor").unwrap());
1505
1506 agent_ref
1508 .create_or_update(
1509 &client,
1510 actor_name.clone(),
1511 resource::Rank::new(0),
1512 ActorSpec {
1513 actor_type: actor_type.clone(),
1514 params_data: actor_params.clone(),
1515 },
1516 )
1517 .await
1518 .unwrap();
1519
1520 let (sub_port, mut sub_rx) = client.open_port::<resource::State<ActorState>>();
1522 agent_ref
1523 .stream_state(&client, actor_name.clone(), sub_port.bind())
1524 .await
1525 .unwrap();
1526
1527 let initial = sub_rx.recv().await.expect("subscriber channel error");
1529 assert_eq!(initial.status, resource::Status::Running);
1530 assert!(initial.state.is_some());
1531
1532 agent_ref
1534 .stop(&client, actor_name.clone(), "test".to_string())
1535 .await
1536 .unwrap();
1537
1538 let stopping = sub_rx.recv().await.expect("subscriber channel error");
1539 assert_eq!(stopping.status, resource::Status::Stopping);
1540
1541 let stopped = sub_rx.recv().await.expect("subscriber channel error");
1543 assert_eq!(stopped.status, resource::Status::Stopped);
1544
1545 let actor_name_2 =
1547 ResourceId::singleton(hyperactor::id::Label::new("test-actor-2").unwrap());
1548 agent_ref
1549 .create_or_update(
1550 &client,
1551 actor_name_2.clone(),
1552 resource::Rank::new(1),
1553 ActorSpec {
1554 actor_type: actor_type.clone(),
1555 params_data: actor_params.clone(),
1556 },
1557 )
1558 .await
1559 .unwrap();
1560
1561 let (sub_port_2, mut sub_rx_2) = client.open_port::<resource::State<ActorState>>();
1562 agent_ref
1563 .stream_state(&client, actor_name_2.clone(), sub_port_2.bind())
1564 .await
1565 .unwrap();
1566
1567 let initial_2 = sub_rx_2.recv().await.expect("subscriber 2 channel error");
1568 assert_eq!(initial_2.status, resource::Status::Running);
1569
1570 drop(sub_rx_2);
1572
1573 agent_ref
1577 .stop(
1578 &client,
1579 actor_name_2.clone(),
1580 "test unsubscribe".to_string(),
1581 )
1582 .await
1583 .unwrap();
1584
1585 let (sub_port_3, mut sub_rx_3) = client.open_port::<resource::State<ActorState>>();
1587 agent_ref
1588 .stream_state(&client, actor_name_2.clone(), sub_port_3.bind())
1589 .await
1590 .unwrap();
1591 loop {
1592 let state = sub_rx_3.recv().await.expect("subscriber 3 channel error");
1593 if state.status.is_terminating() {
1594 break;
1595 }
1596 }
1597
1598 let state = agent_ref
1600 .get_state(&client, actor_name_2.clone())
1601 .await
1602 .unwrap();
1603 assert!(
1604 state.status.is_terminating(),
1605 "expected terminating status, got {:?}",
1606 state.status,
1607 );
1608 }
1609
1610 #[derive(Debug, Default, Serialize, Deserialize)]
1616 #[hyperactor::export(handlers = [BlockMsg])]
1617 struct BlockActor {
1618 #[serde(skip)]
1619 gate: Option<Arc<tokio::sync::Notify>>,
1620 }
1621 impl hyperactor::Actor for BlockActor {}
1622
1623 #[derive(
1624 Debug,
1625 Clone,
1626 Serialize,
1627 Deserialize,
1628 Named,
1629 hyperactor::Handler,
1630 hyperactor::HandleClient
1631 )]
1632 enum BlockMsg {
1633 Block(),
1635 Noop(),
1637 }
1638 wirevalue::register_type!(BlockMsg);
1639
1640 #[async_trait::async_trait]
1641 #[hyperactor::handle(BlockMsg)]
1642 impl BlockMsgHandler for BlockActor {
1643 async fn block(&mut self, _cx: &hyperactor::Context<Self>) -> Result<(), anyhow::Error> {
1644 if let Some(gate) = &self.gate {
1645 gate.notified().await;
1646 }
1647 Ok(())
1648 }
1649 async fn noop(&mut self, _cx: &hyperactor::Context<Self>) -> Result<(), anyhow::Error> {
1650 Ok(())
1651 }
1652 }
1653
1654 #[tokio::test]
1662 async fn test_query_child_proc_queue_depth_under_pressure() {
1663 use hyperactor::Proc;
1664 use hyperactor::actor::ActorStatus;
1665 use hyperactor::channel::ChannelTransport;
1666 use hyperactor::introspect::IntrospectMessage;
1667 use hyperactor::introspect::IntrospectResult;
1668
1669 let proc = Proc::direct(ChannelTransport::Unix.any(), "qd_proc".to_string()).unwrap();
1670 let agent_handle = ProcAgent::boot_v1(proc.clone(), None).unwrap();
1671
1672 agent_handle
1673 .status()
1674 .wait_for(|s| matches!(s, ActorStatus::Idle))
1675 .await
1676 .unwrap();
1677
1678 let client_proc =
1679 Proc::direct(ChannelTransport::Unix.any(), "qd_client".to_string()).unwrap();
1680 let (client, _client_handle) = client_proc.client("client").unwrap();
1681
1682 let gate = Arc::new(tokio::sync::Notify::new());
1684 let blocker = proc
1685 .spawn(
1686 "blocker",
1687 BlockActor {
1688 gate: Some(Arc::clone(&gate)),
1689 },
1690 )
1691 .unwrap();
1692
1693 blocker.block(&client).await.unwrap();
1695 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1697 blocker.noop(&client).await.unwrap();
1698 blocker.noop(&client).await.unwrap();
1699
1700 let agent_id: ActorAddr = proc.proc_addr().actor_addr(PROC_AGENT_ACTOR_NAME);
1703 let port = PortRef::<IntrospectMessage>::attest_handler_port(&agent_id);
1704
1705 let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
1707 loop {
1708 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1709 port.post(
1710 &client,
1711 IntrospectMessage::QueryChild {
1712 child_ref: Addr::Proc(proc.proc_addr().clone()),
1713 reply: reply_port.bind(),
1714 },
1715 );
1716 let payload = tokio::time::timeout(std::time::Duration::from_secs(3), reply_rx.recv())
1717 .await
1718 .expect("QueryChild timed out")
1719 .expect("reply channel closed");
1720
1721 let attrs: hyperactor_config::Attrs =
1722 serde_json::from_str(&payload.attrs).expect("valid attrs JSON");
1723
1724 let total = attrs
1725 .get(crate::introspect::ACTOR_WORK_QUEUE_DEPTH_TOTAL)
1726 .copied()
1727 .unwrap_or(0);
1728 let max = attrs
1729 .get(crate::introspect::ACTOR_WORK_QUEUE_DEPTH_MAX)
1730 .copied()
1731 .unwrap_or(0);
1732
1733 if total > 0 {
1734 assert!(max > 0, "max should be > 0 when total is {total}");
1735 assert!(max <= total, "PD-1: max ({max}) <= total ({total})");
1736 break;
1737 }
1738
1739 assert!(
1740 tokio::time::Instant::now() < deadline,
1741 "timed out waiting for non-zero queue depth in QueryChild(Proc)",
1742 );
1743 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1744 }
1745
1746 gate.notify_one();
1748 }
1749}