1#![allow(unused_assignments)]
16
17use std::collections::HashMap;
18use std::mem::take;
19use std::sync::Arc;
20use std::sync::Mutex;
21use std::sync::RwLock;
22use std::sync::RwLockReadGuard;
23use std::time::Duration;
24
25use async_trait::async_trait;
26use enum_as_inner::EnumAsInner;
27use hyperactor::Actor;
28use hyperactor::ActorHandle;
29use hyperactor::Bind;
30use hyperactor::Context;
31use hyperactor::Data;
32use hyperactor::HandleClient;
33use hyperactor::Handler;
34use hyperactor::Instance;
35use hyperactor::PortHandle;
36use hyperactor::RefClient;
37use hyperactor::Unbind;
38use hyperactor::actor::ActorStatus;
39use hyperactor::actor::remote::Remote;
40use hyperactor::channel;
41use hyperactor::channel::ChannelAddr;
42use hyperactor::mailbox::BoxedMailboxSender;
43use hyperactor::mailbox::DialMailboxRouter;
44use hyperactor::mailbox::IntoBoxedMailboxSender;
45use hyperactor::mailbox::MailboxClient;
46use hyperactor::mailbox::MailboxSender;
47use hyperactor::mailbox::MessageEnvelope;
48use hyperactor::mailbox::Undeliverable;
49use hyperactor::proc::Proc;
50use hyperactor::reference as hyperactor_reference;
51use hyperactor::supervision::ActorSupervisionEvent;
52use hyperactor_config::CONFIG;
53use hyperactor_config::ConfigAttr;
54use hyperactor_config::attrs::declare_attrs;
55use serde::Deserialize;
56use serde::Serialize;
57use typeuri::Named;
58
59use crate::Name;
60use crate::resource;
61
62pub const PROC_AGENT_ACTOR_NAME: &str = "proc_agent";
64
65declare_attrs! {
66 @meta(CONFIG = ConfigAttr::new(
68 Some("HYPERACTOR_MESH_ORPHAN_TIMEOUT".to_string()),
69 Some("mesh_orphan_timeout".to_string()),
70 ))
71 pub attr MESH_ORPHAN_TIMEOUT: Duration = Duration::from_secs(60);
72}
73
74#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Named)]
75pub enum GspawnResult {
76 Success {
77 rank: usize,
78 actor_id: hyperactor_reference::ActorId,
79 },
80 Error(String),
81}
82wirevalue::register_type!(GspawnResult);
83
84#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
85pub enum StopActorResult {
86 Success,
87 Timeout,
88 NotFound,
89}
90wirevalue::register_type!(StopActorResult);
91
92#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
106struct RepublishIntrospect;
107wirevalue::register_type!(RepublishIntrospect);
108
109fn collect_live_children(proc: &hyperactor::Proc) -> (Vec<String>, Vec<String>) {
115 let all_keys = proc.all_instance_keys();
116 let mut children = Vec::with_capacity(all_keys.len());
117 let mut system_children = Vec::new();
118 for id in all_keys {
119 if let Some(cell) = proc.get_instance(&id) {
120 let ref_str = id.to_string();
121 if cell.is_system() {
122 system_children.push(ref_str.clone());
123 }
124 children.push(ref_str);
125 }
126 }
127 (children, system_children)
128}
129
130#[derive(
131 Debug,
132 Clone,
133 PartialEq,
134 Serialize,
135 Deserialize,
136 Handler,
137 HandleClient,
138 RefClient,
139 Named
140)]
141pub(crate) enum MeshAgentMessage {
142 Configure {
144 rank: usize,
146 forwarder: ChannelAddr,
148 supervisor: Option<hyperactor_reference::PortRef<ActorSupervisionEvent>>,
150 address_book: HashMap<hyperactor_reference::ProcId, ChannelAddr>,
152 configured: hyperactor_reference::PortRef<usize>,
155 record_supervision_events: bool,
157 },
158
159 Status {
160 status: hyperactor_reference::PortRef<(usize, bool)>,
164 },
165
166 Gspawn {
168 actor_type: String,
170 actor_name: String,
172 params_data: Data,
174 status_port: hyperactor_reference::PortRef<GspawnResult>,
176 },
177
178 StopActor {
180 actor_id: hyperactor_reference::ActorId,
182 timeout_ms: u64,
184 reason: String,
186 #[reply]
188 stopped: hyperactor_reference::OncePortRef<StopActorResult>,
189 },
190}
191
192#[derive(Debug, EnumAsInner, Default)]
194enum State {
195 UnconfiguredV0 {
196 sender: ReconfigurableMailboxSender,
197 },
198
199 ConfiguredV0 {
200 sender: ReconfigurableMailboxSender,
201 rank: usize,
202 supervisor: Option<hyperactor_reference::PortRef<ActorSupervisionEvent>>,
203 },
204
205 V1,
206
207 #[default]
208 Invalid,
209}
210
211impl State {
212 fn rank(&self) -> Option<usize> {
213 match self {
214 State::ConfiguredV0 { rank, .. } => Some(*rank),
215 _ => None,
216 }
217 }
218
219 fn supervisor(&self) -> Option<hyperactor_reference::PortRef<ActorSupervisionEvent>> {
220 match self {
221 State::ConfiguredV0 { supervisor, .. } => supervisor.clone(),
222 _ => None,
223 }
224 }
225}
226
227#[derive(Debug)]
229struct ActorInstanceState {
230 create_rank: usize,
231 spawn: Result<hyperactor_reference::ActorId, anyhow::Error>,
232 stopped: bool,
235 expiry_time: Option<std::time::SystemTime>,
238}
239
240#[derive(
241 Clone,
242 Debug,
243 Default,
244 PartialEq,
245 Serialize,
246 Deserialize,
247 Named,
248 Bind,
249 Unbind
250)]
251struct SelfCheck {}
252
253#[hyperactor::export(
274 handlers=[
275 MeshAgentMessage,
276 ActorSupervisionEvent,
277 resource::CreateOrUpdate<ActorSpec> { cast = true },
278 resource::Stop { cast = true },
279 resource::StopAll { cast = true },
280 resource::GetState<ActorState> { cast = true },
281 resource::KeepaliveGetState<ActorState> { cast = true },
282 resource::GetRankStatus { cast = true },
283 RepublishIntrospect { cast = true },
284 ]
285)]
286pub struct ProcAgent {
287 proc: Proc,
288 remote: Remote,
289 state: State,
290 actor_states: HashMap<Name, ActorInstanceState>,
292 record_supervision_events: bool,
295 supervision_events: HashMap<hyperactor_reference::ActorId, Vec<ActorSupervisionEvent>>,
298 introspect_dirty: bool,
301 shutdown_tx: Option<tokio::sync::oneshot::Sender<i32>>,
305 mesh_orphan_timeout: Option<Duration>,
307}
308
309impl ProcAgent {
310 #[hyperactor::observe_result("MeshAgent")]
311 pub(crate) async fn bootstrap(
312 proc_id: hyperactor_reference::ProcId,
313 ) -> Result<(Proc, ActorHandle<Self>), anyhow::Error> {
314 let sender = ReconfigurableMailboxSender::new();
315 let proc = Proc::configured(proc_id.clone(), BoxedMailboxSender::new(sender.clone()));
316
317 let agent = ProcAgent {
318 proc: proc.clone(),
319 remote: Remote::collect(),
320 state: State::UnconfiguredV0 { sender },
321 actor_states: HashMap::new(),
322 record_supervision_events: false,
323 supervision_events: HashMap::new(),
324 introspect_dirty: false,
325 shutdown_tx: None,
326 mesh_orphan_timeout: None,
329 };
330 let handle = proc.spawn::<Self>("mesh", agent)?;
331 Ok((proc, handle))
332 }
333
334 pub(crate) fn boot_v1(
335 proc: Proc,
336 shutdown_tx: Option<tokio::sync::oneshot::Sender<i32>>,
337 ) -> Result<ActorHandle<Self>, anyhow::Error> {
338 let orphan_timeout = hyperactor_config::global::get(MESH_ORPHAN_TIMEOUT);
342 let orphan_timeout = if orphan_timeout.is_zero() {
343 None
344 } else {
345 Some(orphan_timeout)
346 };
347 let agent = ProcAgent {
348 proc: proc.clone(),
349 remote: Remote::collect(),
350 state: State::V1,
351 actor_states: HashMap::new(),
352 record_supervision_events: true,
353 supervision_events: HashMap::new(),
354 introspect_dirty: false,
355 shutdown_tx,
356 mesh_orphan_timeout: orphan_timeout,
357 };
358 proc.spawn::<Self>(PROC_AGENT_ACTOR_NAME, agent)
359 }
360
361 async fn destroy_and_wait_except_current<'a>(
362 &mut self,
363 cx: &Context<'a, Self>,
364 timeout: tokio::time::Duration,
365 reason: &str,
366 ) -> Result<
367 (
368 Vec<hyperactor_reference::ActorId>,
369 Vec<hyperactor_reference::ActorId>,
370 ),
371 anyhow::Error,
372 > {
373 self.proc
374 .destroy_and_wait_except_current::<Self>(timeout, Some(cx), true, reason)
375 .await
376 }
377
378 fn publish_introspect_properties(&self, cx: &impl hyperactor::context::Actor) {
381 let (mut children, mut system_children) = collect_live_children(&self.proc);
382
383 let mut stopped_children: Vec<String> = Vec::new();
387 for id in self.proc.all_terminated_actor_ids() {
388 let ref_str = id.to_string();
389 stopped_children.push(ref_str.clone());
390 if let Some(snapshot) = self.proc.terminated_snapshot(&id) {
393 let snapshot_attrs: hyperactor_config::Attrs =
394 serde_json::from_str(&snapshot.attrs).unwrap_or_default();
395 if snapshot_attrs
396 .get(hyperactor::introspect::IS_SYSTEM)
397 .copied()
398 .unwrap_or(false)
399 {
400 system_children.push(ref_str.clone());
401 }
402 }
403 if !children.contains(&ref_str) {
404 children.push(ref_str);
405 }
406 }
407
408 let stopped_retention_cap =
409 hyperactor_config::global::get(hyperactor::config::TERMINATED_SNAPSHOT_RETENTION);
410
411 let failed_actor_count = self.supervision_events.len();
413
414 let num_live = children.len();
416 let mut attrs = hyperactor_config::Attrs::new();
417 attrs.set(crate::introspect::NODE_TYPE, "proc".to_string());
418 attrs.set(
419 crate::introspect::PROC_NAME,
420 self.proc.proc_id().to_string(),
421 );
422 attrs.set(crate::introspect::NUM_ACTORS, num_live);
423 attrs.set(hyperactor::introspect::CHILDREN, children);
424 attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children);
425 attrs.set(crate::introspect::STOPPED_CHILDREN, stopped_children);
426 attrs.set(
427 crate::introspect::STOPPED_RETENTION_CAP,
428 stopped_retention_cap,
429 );
430 attrs.set(crate::introspect::IS_POISONED, failed_actor_count > 0);
431 attrs.set(crate::introspect::FAILED_ACTOR_COUNT, failed_actor_count);
432 cx.instance().publish_attrs(attrs);
433 }
434}
435
436#[async_trait]
437impl Actor for ProcAgent {
438 async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
439 this.set_system();
440 self.proc.set_supervision_coordinator(this.port())?;
441 self.publish_introspect_properties(this);
442
443 let proc = self.proc.clone();
446 let self_id = this.self_id().clone();
447 this.set_query_child_handler(move |child_ref| {
448 use hyperactor::introspect::IntrospectResult;
449
450 if let hyperactor::reference::Reference::Actor(id) = child_ref {
451 if let Some(snapshot) = proc.terminated_snapshot(id) {
452 return snapshot;
453 }
454 }
455
456 if let hyperactor::reference::Reference::Proc(proc_id) = child_ref {
464 if proc_id == proc.proc_id() {
465 let (mut children, mut system_children) = collect_live_children(&proc);
466
467 let mut stopped_children: Vec<String> = Vec::new();
468 for id in proc.all_terminated_actor_ids() {
469 let ref_str = id.to_string();
470 stopped_children.push(ref_str.clone());
471 if let Some(snapshot) = proc.terminated_snapshot(&id) {
472 let snapshot_attrs: hyperactor_config::Attrs =
473 serde_json::from_str(&snapshot.attrs).unwrap_or_default();
474 if snapshot_attrs
475 .get(hyperactor::introspect::IS_SYSTEM)
476 .copied()
477 .unwrap_or(false)
478 {
479 system_children.push(ref_str.clone());
480 }
481 }
482 if !children.contains(&ref_str) {
483 children.push(ref_str);
484 }
485 }
486
487 let stopped_retention_cap = hyperactor_config::global::get(
488 hyperactor::config::TERMINATED_SNAPSHOT_RETENTION,
489 );
490
491 let (is_poisoned, failed_actor_count) = proc
492 .get_instance(&self_id)
493 .and_then(|cell| cell.published_attrs())
494 .map(|attrs| {
495 let is_poisoned = attrs
496 .get(crate::introspect::IS_POISONED)
497 .copied()
498 .unwrap_or(false);
499 let failed_actor_count = attrs
500 .get(crate::introspect::FAILED_ACTOR_COUNT)
501 .copied()
502 .unwrap_or(0);
503 (is_poisoned, failed_actor_count)
504 })
505 .unwrap_or((false, 0));
506
507 let num_live = children.len();
509 let mut attrs = hyperactor_config::Attrs::new();
510 attrs.set(crate::introspect::NODE_TYPE, "proc".to_string());
511 attrs.set(crate::introspect::PROC_NAME, proc_id.to_string());
512 attrs.set(crate::introspect::NUM_ACTORS, num_live);
513 attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children);
514 attrs.set(crate::introspect::STOPPED_CHILDREN, stopped_children);
515 attrs.set(
516 crate::introspect::STOPPED_RETENTION_CAP,
517 stopped_retention_cap,
518 );
519 attrs.set(crate::introspect::IS_POISONED, is_poisoned);
520 attrs.set(crate::introspect::FAILED_ACTOR_COUNT, failed_actor_count);
521 let attrs_json =
522 serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string());
523
524 return IntrospectResult {
525 identity: proc_id.to_string(),
526 attrs: attrs_json,
527 children,
528 parent: None,
529 as_of: humantime::format_rfc3339_millis(std::time::SystemTime::now())
530 .to_string(),
531 };
532 }
533 }
534
535 {
536 let mut error_attrs = hyperactor_config::Attrs::new();
537 error_attrs.set(hyperactor::introspect::ERROR_CODE, "not_found".to_string());
538 error_attrs.set(
539 hyperactor::introspect::ERROR_MESSAGE,
540 format!("child {} not found", child_ref),
541 );
542 IntrospectResult {
543 identity: String::new(),
544 attrs: serde_json::to_string(&error_attrs).unwrap_or_else(|_| "{}".to_string()),
545 children: Vec::new(),
546 parent: None,
547 as_of: humantime::format_rfc3339_millis(std::time::SystemTime::now())
548 .to_string(),
549 }
550 }
551 });
552
553 if let Some(delay) = &self.mesh_orphan_timeout {
554 this.self_message_with_delay(SelfCheck::default(), *delay)?;
555 }
556 Ok(())
557 }
558}
559
560#[async_trait]
561#[hyperactor::handle(MeshAgentMessage)]
562impl MeshAgentMessageHandler for ProcAgent {
563 async fn configure(
564 &mut self,
565 cx: &Context<Self>,
566 rank: usize,
567 forwarder: ChannelAddr,
568 supervisor: Option<hyperactor_reference::PortRef<ActorSupervisionEvent>>,
569 address_book: HashMap<hyperactor_reference::ProcId, ChannelAddr>,
570 configured: hyperactor_reference::PortRef<usize>,
571 record_supervision_events: bool,
572 ) -> Result<(), anyhow::Error> {
573 anyhow::ensure!(
574 self.state.is_unconfigured_v0(),
575 "mesh agent cannot be (re-)configured"
576 );
577 self.record_supervision_events = record_supervision_events;
578
579 let client = MailboxClient::new(channel::dial(forwarder)?);
580 let router =
581 DialMailboxRouter::new_with_default_direct_addressed_remote_only(client.into_boxed());
582
583 for (proc_id, addr) in address_book {
584 router.bind(proc_id.into(), addr);
585 }
586
587 let sender = take(&mut self.state).into_unconfigured_v0().unwrap();
588 assert!(sender.configure(router.into_boxed()));
589
590 self.state = State::ConfiguredV0 {
594 sender,
595 rank,
596 supervisor,
597 };
598 configured.send(cx, rank)?;
599
600 Ok(())
601 }
602
603 async fn gspawn(
604 &mut self,
605 cx: &Context<Self>,
606 actor_type: String,
607 actor_name: String,
608 params_data: Data,
609 status_port: hyperactor_reference::PortRef<GspawnResult>,
610 ) -> Result<(), anyhow::Error> {
611 anyhow::ensure!(
612 self.state.is_configured_v0(),
613 "mesh agent is not v0 configured"
614 );
615 let actor_id = match self
616 .remote
617 .gspawn(
618 &self.proc,
619 &actor_type,
620 &actor_name,
621 params_data,
622 cx.headers().clone(),
623 )
624 .await
625 {
626 Ok(id) => id,
627 Err(err) => {
628 status_port.send(cx, GspawnResult::Error(format!("gspawn failed: {}", err)))?;
629 return Err(anyhow::anyhow!("gspawn failed"));
630 }
631 };
632 status_port.send(
633 cx,
634 GspawnResult::Success {
635 rank: self.state.rank().unwrap(),
636 actor_id,
637 },
638 )?;
639 self.publish_introspect_properties(cx);
640 Ok(())
641 }
642
643 async fn stop_actor(
644 &mut self,
645 cx: &Context<Self>,
646 actor_id: hyperactor_reference::ActorId,
647 timeout_ms: u64,
648 reason: String,
649 ) -> Result<StopActorResult, anyhow::Error> {
650 tracing::info!(
651 name = "StopActor",
652 %actor_id,
653 actor_name = actor_id.name(),
654 %reason,
655 );
656
657 let result = if let Some(mut status) = self.proc.stop_actor(&actor_id, reason) {
658 match tokio::time::timeout(
659 tokio::time::Duration::from_millis(timeout_ms),
660 status.wait_for(|state: &ActorStatus| state.is_terminal()),
661 )
662 .await
663 {
664 Ok(_) => Ok(StopActorResult::Success),
665 Err(_) => Ok(StopActorResult::Timeout),
666 }
667 } else {
668 Ok(StopActorResult::NotFound)
669 };
670 self.publish_introspect_properties(cx);
671 result
672 }
673
674 async fn status(
675 &mut self,
676 cx: &Context<Self>,
677 status_port: hyperactor_reference::PortRef<(usize, bool)>,
678 ) -> Result<(), anyhow::Error> {
679 match &self.state {
680 State::ConfiguredV0 { rank, .. } => {
681 status_port.send(cx, (*rank, true))?;
683 Ok(())
684 }
685 State::UnconfiguredV0 { .. } => {
686 Err(anyhow::anyhow!(
688 "status unavailable: v0 agent not configured (waiting for Configure)"
689 ))
690 }
691 State::V1 => {
692 Err(anyhow::anyhow!(
694 "status unsupported in v1/owned path (no rank)"
695 ))
696 }
697 State::Invalid => Err(anyhow::anyhow!(
698 "status unavailable: agent in invalid state"
699 )),
700 }
701 }
702}
703
704#[async_trait]
705impl Handler<ActorSupervisionEvent> for ProcAgent {
706 async fn handle(
707 &mut self,
708 cx: &Context<Self>,
709 event: ActorSupervisionEvent,
710 ) -> anyhow::Result<()> {
711 if self.record_supervision_events {
712 if event.is_error() {
713 tracing::warn!(
714 name = "SupervisionEvent",
715 proc_id = %self.proc.proc_id(),
716 %event,
717 "recording supervision error",
718 );
719 } else {
720 tracing::debug!(
721 name = "SupervisionEvent",
722 proc_id = %self.proc.proc_id(),
723 %event,
724 "recording non-error supervision event",
725 );
726 }
727 self.supervision_events
728 .entry(event.actor_id.clone())
729 .or_default()
730 .push(event.clone());
731 if !self.introspect_dirty {
735 self.introspect_dirty = true;
736 let _ = cx.self_message_with_delay(
737 RepublishIntrospect,
738 std::time::Duration::from_millis(100),
739 );
740 }
741 }
742 if let Some(supervisor) = self.state.supervisor() {
743 supervisor.send(cx, event)?;
744 } else if !self.record_supervision_events {
745 tracing::error!(
748 name = "supervision_event_transmit_failed",
749 proc_id = %cx.self_id().proc_id(),
750 %event,
751 "could not propagate supervision event, crashing",
752 );
753
754 std::process::exit(1);
757 }
758 Ok(())
759 }
760}
761
762#[async_trait]
763impl Handler<RepublishIntrospect> for ProcAgent {
764 async fn handle(&mut self, cx: &Context<Self>, _: RepublishIntrospect) -> anyhow::Result<()> {
765 if self.introspect_dirty {
766 self.introspect_dirty = false;
767 self.publish_introspect_properties(cx);
768 }
769 Ok(())
770 }
771}
772
773#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
777pub struct ActorSpec {
778 pub actor_type: String,
780 pub params_data: Data,
782}
783wirevalue::register_type!(ActorSpec);
784
785#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
787pub struct ActorState {
788 pub actor_id: hyperactor_reference::ActorId,
790 pub create_rank: usize,
792 pub supervision_events: Vec<ActorSupervisionEvent>,
794}
795wirevalue::register_type!(ActorState);
796
797#[async_trait]
798impl Handler<resource::CreateOrUpdate<ActorSpec>> for ProcAgent {
799 async fn handle(
800 &mut self,
801 cx: &Context<Self>,
802 create_or_update: resource::CreateOrUpdate<ActorSpec>,
803 ) -> anyhow::Result<()> {
804 if self.actor_states.contains_key(&create_or_update.name) {
805 return Ok(());
807 }
808 let create_rank = create_or_update.rank.unwrap();
809 if !self.supervision_events.is_empty() {
813 self.actor_states.insert(
814 create_or_update.name.clone(),
815 ActorInstanceState {
816 spawn: Err(anyhow::anyhow!(
817 "Cannot spawn new actors on mesh with supervision events"
818 )),
819 create_rank,
820 stopped: false,
821 expiry_time: None,
822 },
823 );
824 return Ok(());
825 }
826
827 let ActorSpec {
828 actor_type,
829 params_data,
830 } = create_or_update.spec;
831 self.actor_states.insert(
832 create_or_update.name.clone(),
833 ActorInstanceState {
834 create_rank,
835 spawn: self
836 .remote
837 .gspawn(
838 &self.proc,
839 &actor_type,
840 &create_or_update.name.to_string(),
841 params_data,
842 cx.headers().clone(),
843 )
844 .await,
845 stopped: false,
846 expiry_time: None,
847 },
848 );
849
850 self.publish_introspect_properties(cx);
851 Ok(())
852 }
853}
854
855#[async_trait]
856impl Handler<resource::Stop> for ProcAgent {
857 async fn handle(&mut self, cx: &Context<Self>, message: resource::Stop) -> anyhow::Result<()> {
858 let actor = self.actor_states.get_mut(&message.name);
861 let actor_id = match actor {
864 Some(actor_state) => {
865 match &actor_state.spawn {
866 Ok(actor_id) => {
867 if actor_state.stopped {
868 None
869 } else {
870 actor_state.stopped = true;
871 Some(actor_id.clone())
872 }
873 }
874 Err(_) => None,
877 }
878 }
879 None => None,
881 };
882 let timeout = hyperactor_config::global::get(hyperactor::config::STOP_ACTOR_TIMEOUT);
883 if let Some(actor_id) = actor_id {
884 self.stop_actor(cx, actor_id, timeout.as_millis() as u64, message.reason)
889 .await
890 .expect("stop_actor cannot fail");
891 }
892
893 Ok(())
894 }
895}
896
897#[async_trait]
902impl Handler<resource::StopAll> for ProcAgent {
903 async fn handle(
904 &mut self,
905 cx: &Context<Self>,
906 message: resource::StopAll,
907 ) -> anyhow::Result<()> {
908 let timeout = hyperactor_config::global::get(hyperactor::config::STOP_ACTOR_TIMEOUT);
909 let stop_result = self
912 .destroy_and_wait_except_current(cx, timeout, &message.reason)
913 .await;
914 match stop_result {
919 Ok((stopped_actors, aborted_actors)) => {
920 tracing::info!(
922 actor = %cx.self_id(),
923 "exiting process after receiving StopAll message on ProcAgent. \
924 stopped actors = {:?}, aborted actors = {:?}",
925 stopped_actors.into_iter().map(|a| a.to_string()).collect::<Vec<_>>(),
926 aborted_actors.into_iter().map(|a| a.to_string()).collect::<Vec<_>>(),
927 );
928 if let Some(tx) = self.shutdown_tx.take() {
929 let _ = tx.send(0);
930 return Ok(());
931 }
932 std::process::exit(0);
933 }
934 Err(e) => {
935 tracing::error!(actor = %cx.self_id(), "failed to stop all actors on ProcAgent: {:?}", e);
936 if let Some(tx) = self.shutdown_tx.take() {
937 let _ = tx.send(1);
938 return Ok(());
939 }
940 std::process::exit(1);
941 }
942 }
943 }
944}
945
946#[async_trait]
947impl Handler<resource::GetRankStatus> for ProcAgent {
948 async fn handle(
949 &mut self,
950 cx: &Context<Self>,
951 get_rank_status: resource::GetRankStatus,
952 ) -> anyhow::Result<()> {
953 use crate::StatusOverlay;
954 use crate::resource::Status;
955
956 let (rank, status) = match self.actor_states.get(&get_rank_status.name) {
957 Some(ActorInstanceState {
958 spawn: Ok(actor_id),
959 create_rank,
960 stopped,
961 ..
962 }) => {
963 if *stopped {
964 (*create_rank, resource::Status::Stopped)
965 } else {
966 let supervision_events = self
967 .supervision_events
968 .get(actor_id)
969 .map_or_else(Vec::new, |a| a.clone());
970 (
971 *create_rank,
972 if supervision_events.is_empty() {
973 resource::Status::Running
974 } else {
975 resource::Status::Failed(format!(
976 "because of supervision events: {:?}",
977 supervision_events
978 ))
979 },
980 )
981 }
982 }
983 Some(ActorInstanceState {
984 spawn: Err(e),
985 create_rank,
986 ..
987 }) => (*create_rank, Status::Failed(e.to_string())),
988 None => (usize::MAX, Status::NotExist),
990 };
991
992 let overlay = if rank == usize::MAX {
995 StatusOverlay::new()
996 } else {
997 StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status)])
998 .expect("valid single-run overlay")
999 };
1000 let result = get_rank_status.reply.send(cx, overlay);
1001 if let Err(e) = result {
1005 tracing::warn!(
1006 actor = %cx.self_id(),
1007 "failed to send GetRankStatus reply to {} due to error: {}",
1008 get_rank_status.reply.port_id().actor_id(),
1009 e
1010 );
1011 }
1012 Ok(())
1013 }
1014}
1015
1016#[async_trait]
1017impl Handler<resource::GetState<ActorState>> for ProcAgent {
1018 async fn handle(
1019 &mut self,
1020 cx: &Context<Self>,
1021 get_state: resource::GetState<ActorState>,
1022 ) -> anyhow::Result<()> {
1023 let state = match self.actor_states.get(&get_state.name) {
1024 Some(ActorInstanceState {
1025 create_rank,
1026 spawn: Ok(actor_id),
1027 stopped,
1028 ..
1029 }) => {
1030 let supervision_events = self
1031 .supervision_events
1032 .get(actor_id)
1033 .map_or_else(Vec::new, |a| a.clone());
1034 let status = if *stopped {
1035 resource::Status::Stopped
1036 } else if supervision_events.is_empty() {
1037 resource::Status::Running
1038 } else {
1039 resource::Status::Failed(format!(
1040 "because of supervision events: {:?}",
1041 supervision_events
1042 ))
1043 };
1044 resource::State {
1045 name: get_state.name.clone(),
1046 status,
1047 state: Some(ActorState {
1048 actor_id: actor_id.clone(),
1049 create_rank: *create_rank,
1050 supervision_events,
1051 }),
1052 }
1053 }
1054 Some(ActorInstanceState { spawn: Err(e), .. }) => resource::State {
1055 name: get_state.name.clone(),
1056 status: resource::Status::Failed(e.to_string()),
1057 state: None,
1058 },
1059 None => resource::State {
1060 name: get_state.name.clone(),
1061 status: resource::Status::NotExist,
1062 state: None,
1063 },
1064 };
1065
1066 let result = get_state.reply.send(cx, state);
1067 if let Err(e) = result {
1071 tracing::warn!(
1072 actor = %cx.self_id(),
1073 "failed to send GetState reply to {} due to error: {}",
1074 get_state.reply.port_id().actor_id(),
1075 e
1076 );
1077 }
1078 Ok(())
1079 }
1080}
1081
1082#[async_trait]
1083impl Handler<resource::KeepaliveGetState<ActorState>> for ProcAgent {
1084 async fn handle(
1085 &mut self,
1086 cx: &Context<Self>,
1087 message: resource::KeepaliveGetState<ActorState>,
1088 ) -> anyhow::Result<()> {
1089 if let Ok(instance_state) = self
1091 .actor_states
1092 .get_mut(&message.get_state.name)
1093 .ok_or_else(|| {
1094 anyhow::anyhow!(
1095 "attempting to register a keepalive for an actor that doesn't exist: {}",
1096 message.get_state.name
1097 )
1098 })
1099 {
1100 instance_state.expiry_time = Some(message.expires_after);
1101 }
1102
1103 <Self as Handler<resource::GetState<ActorState>>>::handle(self, cx, message.get_state).await
1105 }
1106}
1107
1108#[derive(Debug, hyperactor::Handler, hyperactor::HandleClient)]
1111pub struct NewClientInstance {
1112 #[reply]
1113 pub client_instance: PortHandle<Instance<()>>,
1114}
1115
1116#[async_trait]
1117impl Handler<NewClientInstance> for ProcAgent {
1118 async fn handle(
1119 &mut self,
1120 cx: &Context<Self>,
1121 NewClientInstance { client_instance }: NewClientInstance,
1122 ) -> anyhow::Result<()> {
1123 let (instance, _handle) = self.proc.instance("client")?;
1124 client_instance.send(cx, instance)?;
1125 Ok(())
1126 }
1127}
1128
1129#[derive(Debug, hyperactor::Handler, hyperactor::HandleClient)]
1132pub struct GetProc {
1133 #[reply]
1134 pub proc: PortHandle<Proc>,
1135}
1136
1137#[async_trait]
1138impl Handler<GetProc> for ProcAgent {
1139 async fn handle(
1140 &mut self,
1141 cx: &Context<Self>,
1142 GetProc { proc }: GetProc,
1143 ) -> anyhow::Result<()> {
1144 proc.send(cx, self.proc.clone())?;
1145 Ok(())
1146 }
1147}
1148
1149#[async_trait]
1150impl Handler<SelfCheck> for ProcAgent {
1151 async fn handle(&mut self, cx: &Context<Self>, _: SelfCheck) -> anyhow::Result<()> {
1152 let Some(duration) = &self.mesh_orphan_timeout else {
1158 return Ok(());
1159 };
1160 let duration = duration.clone();
1161 let now = std::time::SystemTime::now();
1162
1163 let expired: Vec<(Name, hyperactor_reference::ActorId)> = self
1165 .actor_states
1166 .iter()
1167 .filter_map(|(name, state)| {
1168 let expiry = state.expiry_time?;
1169 if now > expiry && !state.stopped {
1171 if let Ok(actor_id) = &state.spawn {
1172 return Some((name.clone(), actor_id.clone()));
1173 }
1174 }
1175 None
1176 })
1177 .collect();
1178
1179 if !expired.is_empty() {
1180 tracing::info!(
1181 "stopping {} orphaned actors past their keepalive expiry",
1182 expired.len(),
1183 );
1184 }
1185
1186 let timeout = hyperactor_config::global::get(hyperactor::config::STOP_ACTOR_TIMEOUT);
1187 for (name, actor_id) in expired {
1188 if let Some(state) = self.actor_states.get_mut(&name) {
1189 state.stopped = true;
1190 }
1191 self.stop_actor(
1192 cx,
1193 actor_id,
1194 timeout.as_millis() as u64,
1195 "orphaned".to_string(),
1196 )
1197 .await
1198 .expect("stop_actor cannot fail");
1199 }
1200
1201 cx.self_message_with_delay(SelfCheck::default(), duration)?;
1203 Ok(())
1204 }
1205}
1206
1207#[derive(Clone)]
1210pub(crate) struct ReconfigurableMailboxSender {
1211 state: Arc<RwLock<ReconfigurableMailboxSenderState>>,
1212}
1213
1214impl std::fmt::Debug for ReconfigurableMailboxSender {
1215 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1216 f.debug_struct("ReconfigurableMailboxSender").finish()
1219 }
1220}
1221
1222pub(crate) struct ReconfigurableMailboxSenderInner<'a> {
1235 guard: RwLockReadGuard<'a, ReconfigurableMailboxSenderState>,
1236}
1237
1238impl<'a> ReconfigurableMailboxSenderInner<'a> {
1239 pub(crate) fn as_configured(&self) -> Option<&BoxedMailboxSender> {
1240 self.guard.as_configured()
1241 }
1242}
1243
1244type Post = (MessageEnvelope, PortHandle<Undeliverable<MessageEnvelope>>);
1245
1246#[derive(EnumAsInner, Debug)]
1247enum ReconfigurableMailboxSenderState {
1248 Queueing(Mutex<Vec<Post>>),
1249 Configured(BoxedMailboxSender),
1250}
1251
1252impl ReconfigurableMailboxSender {
1253 pub(crate) fn new() -> Self {
1254 Self {
1255 state: Arc::new(RwLock::new(ReconfigurableMailboxSenderState::Queueing(
1256 Mutex::new(Vec::new()),
1257 ))),
1258 }
1259 }
1260
1261 pub(crate) fn configure(&self, sender: BoxedMailboxSender) -> bool {
1265 let mut state = self.state.write().unwrap();
1267 if state.is_configured() {
1268 return false;
1269 }
1270
1271 let queued = std::mem::replace(
1273 &mut *state,
1274 ReconfigurableMailboxSenderState::Configured(sender),
1275 );
1276
1277 let configured_sender = state.as_configured().expect("just configured");
1280
1281 for (envelope, return_handle) in queued.into_queueing().unwrap().into_inner().unwrap() {
1283 configured_sender.post(envelope, return_handle);
1284 }
1285
1286 true
1287 }
1288
1289 pub(crate) fn as_inner<'a>(
1290 &'a self,
1291 ) -> Result<ReconfigurableMailboxSenderInner<'a>, anyhow::Error> {
1292 let state = self.state.read().unwrap();
1293 if state.is_configured() {
1294 Ok(ReconfigurableMailboxSenderInner { guard: state })
1295 } else {
1296 Err(anyhow::anyhow!("cannot get inner sender: not configured"))
1297 }
1298 }
1299}
1300
1301impl MailboxSender for ReconfigurableMailboxSender {
1302 fn post(
1303 &self,
1304 envelope: MessageEnvelope,
1305 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1306 ) {
1307 match &*self.state.read().unwrap() {
1308 ReconfigurableMailboxSenderState::Queueing(queue) => {
1309 queue.lock().unwrap().push((envelope, return_handle));
1310 }
1311 ReconfigurableMailboxSenderState::Configured(sender) => {
1312 sender.post(envelope, return_handle);
1313 }
1314 }
1315 }
1316
1317 fn post_unchecked(
1318 &self,
1319 envelope: MessageEnvelope,
1320 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1321 ) {
1322 match &*self.state.read().unwrap() {
1323 ReconfigurableMailboxSenderState::Queueing(queue) => {
1324 queue.lock().unwrap().push((envelope, return_handle));
1325 }
1326 ReconfigurableMailboxSenderState::Configured(sender) => {
1327 sender.post_unchecked(envelope, return_handle);
1328 }
1329 }
1330 }
1331}
1332
1333#[cfg(test)]
1334mod tests {
1335 use std::sync::Arc;
1336 use std::sync::Mutex;
1337
1338 use hyperactor::mailbox::BoxedMailboxSender;
1339 use hyperactor::mailbox::Mailbox;
1340 use hyperactor::mailbox::MailboxSender;
1341 use hyperactor::mailbox::MessageEnvelope;
1342 use hyperactor::mailbox::PortHandle;
1343 use hyperactor::mailbox::Undeliverable;
1344 use hyperactor::testing::ids::test_actor_id;
1345 use hyperactor::testing::ids::test_port_id;
1346 use hyperactor_config::Flattrs;
1347
1348 use super::*;
1349
1350 #[derive(Debug, Clone)]
1351 struct QueueingMailboxSender {
1352 messages: Arc<Mutex<Vec<MessageEnvelope>>>,
1353 }
1354
1355 impl QueueingMailboxSender {
1356 fn new() -> Self {
1357 Self {
1358 messages: Arc::new(Mutex::new(Vec::new())),
1359 }
1360 }
1361
1362 fn get_messages(&self) -> Vec<MessageEnvelope> {
1363 self.messages.lock().unwrap().clone()
1364 }
1365 }
1366
1367 impl MailboxSender for QueueingMailboxSender {
1368 fn post_unchecked(
1369 &self,
1370 envelope: MessageEnvelope,
1371 _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1372 ) {
1373 self.messages.lock().unwrap().push(envelope);
1374 }
1375 }
1376
1377 fn envelope(data: u64) -> MessageEnvelope {
1379 MessageEnvelope::serialize(
1380 test_actor_id("world_0", "sender"),
1381 test_port_id("world_0", "receiver", 1),
1382 &data,
1383 Flattrs::new(),
1384 )
1385 .unwrap()
1386 }
1387
1388 fn return_handle() -> PortHandle<Undeliverable<MessageEnvelope>> {
1389 let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
1390 let (port, _receiver) = mbox.open_port::<Undeliverable<MessageEnvelope>>();
1391 port
1392 }
1393
1394 #[test]
1395 fn test_queueing_before_configure() {
1396 let sender = ReconfigurableMailboxSender::new();
1397
1398 let test_sender = QueueingMailboxSender::new();
1399 let boxed_sender = BoxedMailboxSender::new(test_sender.clone());
1400
1401 let return_handle = return_handle();
1402 sender.post(envelope(1), return_handle.clone());
1403 sender.post(envelope(2), return_handle.clone());
1404
1405 assert_eq!(test_sender.get_messages().len(), 0);
1406
1407 sender.configure(boxed_sender);
1408
1409 let messages = test_sender.get_messages();
1410 assert_eq!(messages.len(), 2);
1411
1412 assert_eq!(messages[0].deserialized::<u64>().unwrap(), 1);
1413 assert_eq!(messages[1].deserialized::<u64>().unwrap(), 2);
1414 }
1415
1416 #[test]
1417 fn test_direct_delivery_after_configure() {
1418 let sender = ReconfigurableMailboxSender::new();
1420
1421 let test_sender = QueueingMailboxSender::new();
1422 let boxed_sender = BoxedMailboxSender::new(test_sender.clone());
1423 sender.configure(boxed_sender);
1424
1425 let return_handle = return_handle();
1426 sender.post(envelope(3), return_handle.clone());
1427 sender.post(envelope(4), return_handle.clone());
1428
1429 let messages = test_sender.get_messages();
1430 assert_eq!(messages.len(), 2);
1431
1432 assert_eq!(messages[0].deserialized::<u64>().unwrap(), 3);
1433 assert_eq!(messages[1].deserialized::<u64>().unwrap(), 4);
1434 }
1435
1436 #[test]
1437 fn test_multiple_configurations() {
1438 let sender = ReconfigurableMailboxSender::new();
1439 let boxed_sender = BoxedMailboxSender::new(QueueingMailboxSender::new());
1440
1441 assert!(sender.configure(boxed_sender.clone()));
1442 assert!(!sender.configure(boxed_sender));
1443 }
1444
1445 #[test]
1446 fn test_mixed_queueing_and_direct_delivery() {
1447 let sender = ReconfigurableMailboxSender::new();
1448
1449 let test_sender = QueueingMailboxSender::new();
1450 let boxed_sender = BoxedMailboxSender::new(test_sender.clone());
1451
1452 let return_handle = return_handle();
1453 sender.post(envelope(5), return_handle.clone());
1454 sender.post(envelope(6), return_handle.clone());
1455
1456 sender.configure(boxed_sender);
1457
1458 sender.post(envelope(7), return_handle.clone());
1459 sender.post(envelope(8), return_handle.clone());
1460
1461 let messages = test_sender.get_messages();
1462 assert_eq!(messages.len(), 4);
1463
1464 assert_eq!(messages[0].deserialized::<u64>().unwrap(), 5);
1465 assert_eq!(messages[1].deserialized::<u64>().unwrap(), 6);
1466 assert_eq!(messages[2].deserialized::<u64>().unwrap(), 7);
1467 assert_eq!(messages[3].deserialized::<u64>().unwrap(), 8);
1468 }
1469
1470 #[derive(Debug, Default)]
1472 #[hyperactor::export(handlers = [])]
1473 struct ExtraActor;
1474 impl hyperactor::Actor for ExtraActor {}
1475
1476 #[tokio::test]
1490 async fn test_query_child_proc_returns_live_children() {
1491 use hyperactor::Proc;
1492 use hyperactor::actor::ActorStatus;
1493 use hyperactor::channel::ChannelTransport;
1494 use hyperactor::introspect::IntrospectMessage;
1495 use hyperactor::introspect::IntrospectResult;
1496 use hyperactor::reference as hyperactor_reference;
1497
1498 let proc = Proc::direct(ChannelTransport::Unix.any(), "test_proc".to_string()).unwrap();
1499 let agent_handle = ProcAgent::boot_v1(proc.clone(), None).unwrap();
1500
1501 agent_handle
1503 .status()
1504 .wait_for(|s| matches!(s, ActorStatus::Idle))
1505 .await
1506 .unwrap();
1507
1508 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1510 let (client, _client_handle) = client_proc.instance("client").unwrap();
1511
1512 let agent_id = proc.proc_id().actor_id(PROC_AGENT_ACTOR_NAME, 0);
1513 let port =
1514 hyperactor_reference::PortRef::<IntrospectMessage>::attest_message_port(&agent_id);
1515
1516 let query = |client: &hyperactor::Instance<()>| {
1519 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1520 port.send(
1521 client,
1522 IntrospectMessage::QueryChild {
1523 child_ref: hyperactor_reference::Reference::Proc(proc.proc_id().clone()),
1524 reply: reply_port.bind(),
1525 },
1526 )
1527 .unwrap();
1528 reply_rx
1529 };
1530 let recv = |rx: hyperactor::mailbox::OncePortReceiver<IntrospectResult>| async move {
1531 tokio::time::timeout(std::time::Duration::from_secs(5), rx.recv())
1532 .await
1533 .expect("QueryChild(Proc) timed out — reply never delivered")
1534 .expect("reply channel closed")
1535 };
1536
1537 let payload = recv(query(&client)).await;
1539 let attrs: hyperactor_config::Attrs =
1541 serde_json::from_str(&payload.attrs).expect("valid attrs JSON");
1542 assert_eq!(
1543 attrs.get(crate::introspect::NODE_TYPE).map(String::as_str),
1544 Some("proc"),
1545 "expected node_type=proc in attrs, got {:?}",
1546 payload.attrs
1547 );
1548 assert!(
1549 payload
1550 .children
1551 .iter()
1552 .any(|c| c.contains(PROC_AGENT_ACTOR_NAME)),
1553 "initial children {:?} should contain proc_agent",
1554 payload.children
1555 );
1556 let initial_count = payload.children.len();
1557
1558 proc.spawn("extra_actor", ExtraActor).unwrap();
1562
1563 let payload2 = recv(query(&client)).await;
1565 let attrs2: hyperactor_config::Attrs =
1566 serde_json::from_str(&payload2.attrs).expect("valid attrs JSON");
1567 assert_eq!(
1568 attrs2.get(crate::introspect::NODE_TYPE).map(String::as_str),
1569 Some("proc"),
1570 "expected node_type=proc in attrs, got {:?}",
1571 payload2.attrs
1572 );
1573 assert!(
1574 payload2.children.iter().any(|c| c.contains("extra_actor")),
1575 "after direct spawn, children {:?} should contain extra_actor",
1576 payload2.children
1577 );
1578 assert!(
1579 payload2.children.len() > initial_count,
1580 "expected at least {} children after direct spawn, got {:?}",
1581 initial_count + 1,
1582 payload2.children
1583 );
1584 }
1585
1586 #[tokio::test]
1593 async fn test_rapid_spawn_stop_does_not_stall_proc_agent() {
1594 use std::sync::Arc;
1595 use std::sync::atomic::AtomicUsize;
1596 use std::sync::atomic::Ordering;
1597
1598 use hyperactor::Proc;
1599 use hyperactor::actor::ActorStatus;
1600 use hyperactor::channel::ChannelTransport;
1601 use hyperactor::introspect::IntrospectMessage;
1602 use hyperactor::introspect::IntrospectResult;
1603 use hyperactor::reference as hyperactor_reference;
1604
1605 let proc = Proc::direct(ChannelTransport::Unix.any(), "test_proc".to_string()).unwrap();
1606 let agent_handle = ProcAgent::boot_v1(proc.clone(), None).unwrap();
1607
1608 agent_handle
1609 .status()
1610 .wait_for(|s| matches!(s, ActorStatus::Idle))
1611 .await
1612 .unwrap();
1613
1614 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1615 let (client, _client_handle) = client_proc.instance("client").unwrap();
1616
1617 let agent_id = proc.proc_id().actor_id(PROC_AGENT_ACTOR_NAME, 0);
1618 let port =
1619 hyperactor_reference::PortRef::<IntrospectMessage>::attest_message_port(&agent_id);
1620
1621 let query_client_proc =
1623 Proc::direct(ChannelTransport::Unix.any(), "query_client".to_string()).unwrap();
1624 let (query_client, _qc_handle) = query_client_proc.instance("qc").unwrap();
1625 let query_port = port.clone();
1626 let query_proc_id = proc.proc_id().clone();
1627 let query_count = Arc::new(AtomicUsize::new(0));
1628 let query_count_clone = query_count.clone();
1629 let query_task = tokio::spawn(async move {
1630 loop {
1631 let (reply_port, reply_rx) = query_client.open_once_port::<IntrospectResult>();
1632 if query_port
1633 .send(
1634 &query_client,
1635 IntrospectMessage::QueryChild {
1636 child_ref: hyperactor_reference::Reference::Proc(query_proc_id.clone()),
1637 reply: reply_port.bind(),
1638 },
1639 )
1640 .is_err()
1641 {
1642 break;
1643 }
1644 match tokio::time::timeout(std::time::Duration::from_secs(2), reply_rx.recv()).await
1645 {
1646 Ok(Ok(_)) => {
1647 query_count_clone.fetch_add(1, Ordering::Relaxed);
1648 }
1649 _ => {} }
1651 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1652 }
1653 });
1654
1655 const ITERATIONS: usize = 200;
1657 let mut completed = 0usize;
1658 let result = tokio::time::timeout(std::time::Duration::from_secs(30), async {
1659 for i in 0..ITERATIONS {
1660 let name = format!("churn_{}", i);
1661 let handle = proc.spawn(&name, ExtraActor).unwrap();
1662 let actor_id = handle.actor_id().clone();
1663 if let Some(mut status) = proc.stop_actor(&actor_id, "churn".to_string()) {
1664 let _ = tokio::time::timeout(
1665 std::time::Duration::from_secs(5),
1666 status.wait_for(ActorStatus::is_terminal),
1667 )
1668 .await;
1669 }
1670 completed += 1;
1671 }
1672 })
1673 .await;
1674
1675 query_task.abort();
1676 let _ = query_task.await; assert!(
1679 result.is_ok(),
1680 "spawn/stop loop stalled after {completed}/{ITERATIONS} iterations — \
1681 DashMap convoy starvation likely"
1682 );
1683 assert_eq!(
1684 completed, ITERATIONS,
1685 "expected {ITERATIONS} completed iterations, got {completed}"
1686 );
1687 assert!(
1688 query_count.load(Ordering::Relaxed) > 0,
1689 "concurrent QueryChild queries never succeeded — query task may not have run"
1690 );
1691
1692 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1694 port.send(
1695 &client,
1696 IntrospectMessage::QueryChild {
1697 child_ref: hyperactor_reference::Reference::Proc(proc.proc_id().clone()),
1698 reply: reply_port.bind(),
1699 },
1700 )
1701 .unwrap();
1702 let final_payload =
1703 tokio::time::timeout(std::time::Duration::from_secs(5), reply_rx.recv())
1704 .await
1705 .expect("final QueryChild timed out")
1706 .expect("final QueryChild channel closed");
1707 let attrs: hyperactor_config::Attrs =
1708 serde_json::from_str(&final_payload.attrs).expect("valid attrs JSON");
1709 assert_eq!(
1710 attrs.get(crate::introspect::NODE_TYPE).map(String::as_str),
1711 Some("proc"),
1712 );
1713 }
1714}