1use std::any::type_name;
10use std::collections::HashMap;
11use std::collections::HashSet;
12use std::fmt;
13use std::ops::Deref;
14use std::panic::Location;
15use std::sync::Arc;
16use std::sync::OnceLock;
17use std::sync::atomic::AtomicUsize;
18use std::sync::atomic::Ordering;
19use std::time::Duration;
20
21use hyperactor::Actor;
22use hyperactor::ActorHandle;
23use hyperactor::ActorId;
24use hyperactor::ActorRef;
25use hyperactor::Named;
26use hyperactor::ProcId;
27use hyperactor::RemoteMessage;
28use hyperactor::accum::ReducerOpts;
29use hyperactor::actor::ActorStatus;
30use hyperactor::actor::Referable;
31use hyperactor::actor::remote::Remote;
32use hyperactor::channel;
33use hyperactor::channel::ChannelAddr;
34use hyperactor::clock::Clock;
35use hyperactor::clock::RealClock;
36use hyperactor::config;
37use hyperactor::config::CONFIG;
38use hyperactor::config::ConfigAttr;
39use hyperactor::context;
40use hyperactor::declare_attrs;
41use hyperactor::mailbox::DialMailboxRouter;
42use hyperactor::mailbox::MailboxServer;
43use hyperactor::supervision::ActorSupervisionEvent;
44use ndslice::Extent;
45use ndslice::ViewExt as _;
46use ndslice::view;
47use ndslice::view::CollectMeshExt;
48use ndslice::view::MapIntoExt;
49use ndslice::view::Ranked;
50use ndslice::view::Region;
51use serde::Deserialize;
52use serde::Serialize;
53use tokio::sync::Notify;
54use tracing::Instrument;
55
56use crate::CommActor;
57use crate::alloc::Alloc;
58use crate::alloc::AllocExt;
59use crate::alloc::AllocatedProc;
60use crate::assign::Ranks;
61use crate::comm::CommActorMode;
62use crate::proc_mesh::mesh_agent;
63use crate::proc_mesh::mesh_agent::ActorState;
64use crate::proc_mesh::mesh_agent::MeshAgentMessageClient;
65use crate::proc_mesh::mesh_agent::ProcMeshAgent;
66use crate::proc_mesh::mesh_agent::ReconfigurableMailboxSender;
67use crate::resource;
68use crate::resource::GetRankStatus;
69use crate::resource::Status;
70use crate::v1;
71use crate::v1::ActorMesh;
72use crate::v1::ActorMeshRef;
73use crate::v1::Error;
74use crate::v1::HostMeshRef;
75use crate::v1::Name;
76use crate::v1::ValueMesh;
77use crate::v1::host_mesh::mesh_agent::ProcState;
78use crate::v1::host_mesh::mesh_to_rankedvalues_with_default;
79use crate::v1::mesh_controller::ActorMeshController;
80
81declare_attrs! {
82 @meta(CONFIG = ConfigAttr {
85 env_name: Some("HYPERACTOR_MESH_ACTOR_SPAWN_MAX_IDLE".to_string()),
86 py_name: None,
87 })
88 pub attr ACTOR_SPAWN_MAX_IDLE: Duration = Duration::from_secs(30);
89
90 @meta(CONFIG = ConfigAttr {
91 env_name: Some("HYPERACTOR_MESH_GET_ACTOR_STATE_MAX_IDLE".to_string()),
92 py_name: None,
93 })
94 pub attr GET_ACTOR_STATE_MAX_IDLE: Duration = Duration::from_secs(60);
95}
96
97#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
99pub struct ProcRef {
100 proc_id: ProcId,
101 create_rank: usize,
103 agent: ActorRef<ProcMeshAgent>,
105}
106
107impl ProcRef {
108 pub(crate) fn new(proc_id: ProcId, create_rank: usize, agent: ActorRef<ProcMeshAgent>) -> Self {
109 Self {
110 proc_id,
111 create_rank,
112 agent,
113 }
114 }
115
116 pub(crate) async fn status(&self, cx: &impl context::Actor) -> v1::Result<bool> {
119 let (port, mut rx) = cx.mailbox().open_port();
120 self.agent
121 .status(cx, port.bind())
122 .await
123 .map_err(|e| Error::CallError(self.agent.actor_id().clone(), e))?;
124 loop {
125 let (rank, status) = rx
126 .recv()
127 .await
128 .map_err(|e| Error::CallError(self.agent.actor_id().clone(), e.into()))?;
129 if rank == self.create_rank {
130 break Ok(status);
131 }
132 }
133 }
134
135 #[allow(dead_code)]
137 async fn actor_state(
138 &self,
139 cx: &impl context::Actor,
140 name: Name,
141 ) -> v1::Result<resource::State<ActorState>> {
142 let (port, mut rx) = cx.mailbox().open_port::<resource::State<ActorState>>();
143 self.agent
144 .send(
145 cx,
146 resource::GetState::<ActorState> {
147 name: name.clone(),
148 reply: port.bind(),
149 },
150 )
151 .map_err(|e| Error::CallError(self.agent.actor_id().clone(), e.into()))?;
152 let state = rx
153 .recv()
154 .await
155 .map_err(|e| Error::CallError(self.agent.actor_id().clone(), e.into()))?;
156 if let Some(ref inner) = state.state {
157 let rank = inner.create_rank;
158 if rank == self.create_rank {
159 Ok(state)
160 } else {
161 Err(Error::CallError(
162 self.agent.actor_id().clone(),
163 anyhow::anyhow!(
164 "Rank on mesh agent not matching for Actor {}: returned {}, expected {}",
165 name,
166 rank,
167 self.create_rank
168 ),
169 ))
170 }
171 } else {
172 Err(Error::CallError(
173 self.agent.actor_id().clone(),
174 anyhow::anyhow!("Actor {} does not exist", name),
175 ))
176 }
177 }
178
179 pub fn proc_id(&self) -> &ProcId {
180 &self.proc_id
181 }
182
183 pub(crate) fn actor_id(&self, name: &Name) -> ActorId {
184 self.proc_id.actor_id(name.to_string(), 0)
185 }
186
187 pub(crate) fn attest<A: Referable>(&self, name: &Name) -> ActorRef<A> {
190 ActorRef::attest(self.actor_id(name))
191 }
192}
193
194#[derive(Debug)]
196pub struct ProcMesh {
197 #[allow(dead_code)]
198 name: Name,
199 allocation: ProcMeshAllocation,
200 #[allow(dead_code)]
201 comm_actor_name: Option<Name>,
202 current_ref: ProcMeshRef,
203}
204
205impl ProcMesh {
206 async fn create(
207 cx: &impl context::Actor,
208 name: Name,
209 allocation: ProcMeshAllocation,
210 spawn_comm_actor: bool,
211 ) -> v1::Result<Self> {
212 let comm_actor_name = if spawn_comm_actor {
213 Some(Name::new("comm"))
214 } else {
215 None
216 };
217
218 let region = allocation.extent().clone().into();
219 let ranks = allocation.ranks();
220 let root_comm_actor = comm_actor_name.as_ref().map(|name| {
221 ActorRef::attest(
222 ranks
223 .first()
224 .expect("root mesh cannot be empty")
225 .actor_id(name),
226 )
227 });
228 let host_mesh = allocation.hosts();
229 let current_ref = ProcMeshRef::new(
230 name.clone(),
231 region,
232 ranks,
233 host_mesh.cloned(),
234 None, None, )
237 .unwrap();
238
239 let mut proc_mesh = Self {
240 name,
241 allocation,
242 comm_actor_name: comm_actor_name.clone(),
243 current_ref,
244 };
245
246 if let Some(comm_actor_name) = comm_actor_name {
247 let comm_actor_mesh = proc_mesh
250 .spawn_with_name::<CommActor>(cx, comm_actor_name, &Default::default())
251 .await?;
252 let address_book: HashMap<_, _> = comm_actor_mesh
253 .iter()
254 .map(|(point, actor_ref)| (point.rank(), actor_ref))
255 .collect();
256 for (rank, comm_actor) in &address_book {
259 comm_actor
260 .send(cx, CommActorMode::Mesh(*rank, address_book.clone()))
261 .map_err(|e| Error::SendingError(comm_actor.actor_id().clone(), Box::new(e)))?
262 }
263
264 proc_mesh.current_ref.root_comm_actor = root_comm_actor;
266 }
267
268 Ok(proc_mesh)
269 }
270
271 pub(crate) async fn create_owned_unchecked(
272 cx: &impl context::Actor,
273 name: Name,
274 extent: Extent,
275 hosts: HostMeshRef,
276 ranks: Vec<ProcRef>,
277 ) -> v1::Result<Self> {
278 Self::create(
279 cx,
280 name,
281 ProcMeshAllocation::Owned {
282 hosts,
283 extent,
284 ranks: Arc::new(ranks),
285 },
286 true,
287 )
288 .await
289 }
290
291 fn alloc_counter() -> &'static AtomicUsize {
292 static C: OnceLock<AtomicUsize> = OnceLock::new();
293 C.get_or_init(|| AtomicUsize::new(0))
294 }
295
296 #[track_caller]
299 pub async fn allocate(
300 cx: &impl context::Actor,
301 alloc: Box<dyn Alloc + Send + Sync + 'static>,
302 name: &str,
303 ) -> v1::Result<Self> {
304 let caller = Location::caller();
305 Self::allocate_inner(cx, alloc, Name::new(name), caller).await
306 }
307
308 #[hyperactor::instrument(fields(proc_mesh=name.to_string()))]
310 async fn allocate_inner(
311 cx: &impl context::Actor,
312 mut alloc: Box<dyn Alloc + Send + Sync + 'static>,
313 name: Name,
314 caller: &'static Location<'static>,
315 ) -> v1::Result<Self> {
316 let alloc_id = Self::alloc_counter().fetch_add(1, Ordering::Relaxed) + 1;
317 tracing::info!(
318 name = "ProcMeshStatus",
319 status = "Allocate::Attempt",
320 %caller,
321 alloc_id,
322 shape = ?alloc.shape(),
323 "allocating proc mesh"
324 );
325
326 let running = alloc
327 .initialize()
328 .instrument(tracing::info_span!(
329 "ProcMeshStatus::Allocate::Initialize",
330 alloc_id,
331 proc_mesh = %name
332 ))
333 .await?;
334
335 let proc = cx.instance().proc();
341
342 let proc_channel_addr = {
344 let _guard =
345 tracing::info_span!("allocate_serve_proc", proc_id = %proc.proc_id()).entered();
346 let (addr, rx) = channel::serve(ChannelAddr::any(alloc.transport()))?;
347 proc.clone().serve(rx);
348 tracing::info!(
349 name = "ProcMeshStatus",
350 status = "Allocate::ChannelServe",
351 proc_mesh = %name,
352 %addr,
353 "proc started listening on addr: {addr}"
354 );
355 addr
356 };
357
358 let bind_allocated_procs = |router: &DialMailboxRouter| {
359 for AllocatedProc { proc_id, addr, .. } in running.iter() {
361 if proc_id.is_direct() {
362 continue;
363 }
364 router.bind(proc_id.clone().into(), addr.clone());
365 }
366 };
367
368 if let Some(router) = proc.forwarder().downcast_ref() {
373 bind_allocated_procs(router);
374 } else if let Some(router) = proc
375 .forwarder()
376 .downcast_ref::<ReconfigurableMailboxSender>()
377 {
378 bind_allocated_procs(
379 router
380 .as_inner()
381 .map_err(|_| Error::UnroutableMesh())?
382 .as_configured()
383 .ok_or(Error::UnroutableMesh())?
384 .downcast_ref()
385 .ok_or(Error::UnroutableMesh())?,
386 );
387 } else {
388 return Err(Error::UnroutableMesh());
389 }
390
391 let address_book: HashMap<_, _> = running
394 .iter()
395 .map(
396 |AllocatedProc {
397 addr, mesh_agent, ..
398 }| { (mesh_agent.actor_id().proc_id().clone(), addr.clone()) },
399 )
400 .collect();
401
402 let (config_handle, mut config_receiver) = cx.mailbox().open_port();
403 for (rank, AllocatedProc { mesh_agent, .. }) in running.iter().enumerate() {
404 mesh_agent
405 .configure(
406 cx,
407 rank,
408 proc_channel_addr.clone(),
409 None, address_book.clone(),
411 config_handle.bind(),
412 true,
413 )
414 .await
415 .map_err(Error::ConfigurationError)?;
416 }
417 let mut completed = Ranks::new(running.len());
418 while !completed.is_full() {
419 let rank = config_receiver
420 .recv()
421 .await
422 .map_err(|err| Error::ConfigurationError(err.into()))?;
423 if completed.insert(rank, rank).is_some() {
424 tracing::warn!("multiple completions received for rank {}", rank);
425 }
426 }
427
428 let ranks: Vec<_> = running
429 .into_iter()
430 .enumerate()
431 .map(|(create_rank, allocated)| ProcRef {
432 proc_id: allocated.proc_id,
433 create_rank,
434 agent: allocated.mesh_agent,
435 })
436 .collect();
437
438 let stop = Arc::new(Notify::new());
439 let extent = alloc.extent().clone();
440 let alloc_name = alloc.world_id().to_string();
441
442 {
443 let stop = Arc::clone(&stop);
444
445 tokio::spawn(
446 async move {
447 loop {
448 tokio::select! {
449 _ = stop.notified() => {
450 if let Err(error) = alloc.stop_and_wait().await {
452 tracing::error!(
453 name = "ProcMeshStatus",
454 alloc_name = %alloc.world_id(),
455 status = "FailedToStopAlloc",
456 %error,
457 );
458 }
459 break;
460 }
461 proc_state = alloc.next() => {
463 match proc_state {
464 None => break,
466 Some(proc_state) => {
467 tracing::debug!(
468 alloc_name = %alloc.world_id(),
469 "unmonitored allocation event: {}", proc_state);
470 }
471 }
472
473 }
474 }
475 }
476 }
477 .instrument(tracing::info_span!("alloc_monitor")),
478 );
479 }
480
481 let mesh = Self::create(
482 cx,
483 name,
484 ProcMeshAllocation::Allocated {
485 alloc_name,
486 stop,
487 extent,
488 ranks: Arc::new(ranks),
489 },
490 true, )
492 .await;
493 match &mesh {
494 Ok(_) => tracing::info!(name = "ProcMeshStatus", status = "Allocate::Created"),
495 Err(error) => {
496 tracing::info!(name = "ProcMeshStatus", status = "Allocate::Failed", %error)
497 }
498 }
499 mesh
500 }
501
502 #[cfg(test)]
504 pub(crate) fn detach(self) -> ProcMeshRef {
505 self.current_ref.clone()
507 }
508
509 pub async fn stop(&mut self, cx: &impl context::Actor) -> anyhow::Result<()> {
511 let region = self.region.clone();
512 match &mut self.allocation {
513 ProcMeshAllocation::Allocated {
514 stop, alloc_name, ..
515 } => {
516 stop.notify_one();
517 tracing::info!(
518 name = "ProcMeshStatus",
519 proc_mesh = %self.name,
520 alloc_name,
521 status = "StoppingAlloc",
522 "sending stop to alloc {alloc_name}; check its log for stop status",
523 );
524 Ok(())
525 }
526 ProcMeshAllocation::Owned { hosts, .. } => {
527 let procs = self.current_ref.proc_ids().collect::<Vec<ProcId>>();
528 hosts.stop_proc_mesh(cx, &self.name, procs, region).await
531 }
532 }
533 }
534}
535
536impl fmt::Display for ProcMesh {
537 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
538 write!(f, "{}", self.current_ref)
539 }
540}
541
542impl Deref for ProcMesh {
543 type Target = ProcMeshRef;
544
545 fn deref(&self) -> &Self::Target {
546 &self.current_ref
547 }
548}
549
550impl Drop for ProcMesh {
551 fn drop(&mut self) {
552 tracing::info!(
553 name = "ProcMeshStatus",
554 proc_mesh = %self.name,
555 status = "Dropped",
556 );
557 }
558}
559
560enum ProcMeshAllocation {
562 Allocated {
564 alloc_name: String,
566
567 stop: Arc<Notify>,
569
570 extent: Extent,
571
572 ranks: Arc<Vec<ProcRef>>,
574 },
575
576 Owned {
578 hosts: HostMeshRef,
580 extent: Extent,
583 ranks: Arc<Vec<ProcRef>>,
585 },
586}
587
588impl ProcMeshAllocation {
589 fn extent(&self) -> &Extent {
590 match self {
591 ProcMeshAllocation::Allocated { extent, .. } => extent,
592 ProcMeshAllocation::Owned { extent, .. } => extent,
593 }
594 }
595
596 fn ranks(&self) -> Arc<Vec<ProcRef>> {
597 Arc::clone(match self {
598 ProcMeshAllocation::Allocated { ranks, .. } => ranks,
599 ProcMeshAllocation::Owned { ranks, .. } => ranks,
600 })
601 }
602
603 fn hosts(&self) -> Option<&HostMeshRef> {
604 match self {
605 ProcMeshAllocation::Allocated { .. } => None,
606 ProcMeshAllocation::Owned { hosts, .. } => Some(hosts),
607 }
608 }
609}
610
611impl fmt::Debug for ProcMeshAllocation {
612 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
613 match self {
614 ProcMeshAllocation::Allocated { ranks, .. } => f
615 .debug_struct("ProcMeshAllocation::Allocated")
616 .field("alloc", &"<dyn Alloc>")
617 .field("ranks", ranks)
618 .finish(),
619 ProcMeshAllocation::Owned {
620 hosts,
621 ranks,
622 extent: _,
623 } => f
624 .debug_struct("ProcMeshAllocation::Owned")
625 .field("hosts", hosts)
626 .field("ranks", ranks)
627 .finish(),
628 }
629 }
630}
631
632#[derive(Debug, Clone, PartialEq, Eq, Hash, Named, Serialize, Deserialize)]
639pub struct ProcMeshRef {
640 name: Name,
641 region: Region,
642 ranks: Arc<Vec<ProcRef>>,
643 host_mesh: Option<HostMeshRef>,
645 pub(crate) root_region: Option<Region>,
649 pub(crate) root_comm_actor: Option<ActorRef<CommActor>>,
654}
655
656impl ProcMeshRef {
657 #[allow(clippy::result_large_err)]
659 fn new(
660 name: Name,
661 region: Region,
662 ranks: Arc<Vec<ProcRef>>,
663 host_mesh: Option<HostMeshRef>,
664 root_region: Option<Region>,
665 root_comm_actor: Option<ActorRef<CommActor>>,
666 ) -> v1::Result<Self> {
667 if region.num_ranks() != ranks.len() {
668 return Err(v1::Error::InvalidRankCardinality {
669 expected: region.num_ranks(),
670 actual: ranks.len(),
671 });
672 }
673 Ok(Self {
674 name,
675 region,
676 ranks,
677 host_mesh,
678 root_region,
679 root_comm_actor,
680 })
681 }
682
683 pub(crate) fn root_comm_actor(&self) -> Option<&ActorRef<CommActor>> {
684 self.root_comm_actor.as_ref()
685 }
686
687 pub fn name(&self) -> &Name {
688 &self.name
689 }
690
691 pub fn host_mesh_name(&self) -> Option<&Name> {
692 self.host_mesh.as_ref().map(|h| h.name())
693 }
694
695 pub fn hosts(&self) -> Option<&HostMeshRef> {
698 self.host_mesh.as_ref()
699 }
700
701 pub async fn status(&self, cx: &impl context::Actor) -> v1::Result<ValueMesh<bool>> {
703 let vm: ValueMesh<_> = self.map_into(|proc_ref| {
704 let proc_ref = proc_ref.clone();
705 async move { proc_ref.status(cx).await }
706 });
707 vm.join().await.transpose()
708 }
709
710 pub(crate) fn agent_mesh(&self) -> ActorMeshRef<ProcMeshAgent> {
711 let agent_name = self.ranks.first().unwrap().agent.actor_id().name();
712 ActorMeshRef::new(Name::new_reserved(agent_name), self.clone())
714 }
715
716 pub async fn actor_states(
718 &self,
719 cx: &impl context::Actor,
720 name: Name,
721 ) -> v1::Result<ValueMesh<resource::State<ActorState>>> {
722 let agent_mesh = self.agent_mesh();
723 let (port, mut rx) = cx.mailbox().open_port::<resource::State<ActorState>>();
724 agent_mesh.cast(
727 cx,
728 resource::GetState::<ActorState> {
729 name: name.clone(),
730 reply: port.bind(),
731 },
732 )?;
733 let expected = self.ranks.len();
734 let mut states = Vec::with_capacity(expected);
735 let timeout = config::global::get(GET_ACTOR_STATE_MAX_IDLE);
736 for _ in 0..expected {
737 let state = RealClock.timeout(timeout, rx.recv()).await;
743 if let Ok(state) = state {
744 let state = state?;
746 match state.state {
747 Some(ref inner) => {
748 states.push((inner.create_rank, state));
749 }
750 None => {
751 return Err(Error::NotExist(state.name));
752 }
753 }
754 } else {
755 tracing::error!(
756 "timeout waiting for a message after {:?} from proc mesh agent in mesh {}",
757 timeout,
758 agent_mesh
759 );
760 let all_ranks = (0..self.ranks.len()).collect::<HashSet<_>>();
763 let completed_ranks = states.iter().map(|(rank, _)| *rank).collect::<HashSet<_>>();
764 let mut leftover_ranks = all_ranks.difference(&completed_ranks).collect::<Vec<_>>();
765 assert_eq!(leftover_ranks.len(), expected - states.len());
766 while states.len() < expected {
767 let rank = *leftover_ranks
768 .pop()
769 .expect("leftover ranks should not be empty");
770 let agent = agent_mesh.get(rank).expect("agent should exist");
771 let agent_id = agent.actor_id().clone();
772 states.push((
773 rank,
775 resource::State {
776 name: name.clone(),
777 status: resource::Status::Timeout(timeout),
778 state: Some(ActorState {
781 actor_id: agent_id.clone(),
782 create_rank: rank,
783 supervision_events: vec![ActorSupervisionEvent::new(
784 agent_id,
785 None,
786 ActorStatus::Stopped,
787 None,
788 )],
789 }),
790 },
791 ));
792 }
793 break;
794 }
795 }
796 states.sort_by_key(|(rank, _)| *rank);
800 let vm = states
801 .into_iter()
802 .map(|(_, state)| state)
803 .collect_mesh::<ValueMesh<_>>(self.region.clone())?;
804 Ok(vm)
805 }
806
807 pub async fn proc_states(
808 &self,
809 cx: &impl context::Actor,
810 ) -> v1::Result<Option<ValueMesh<resource::State<ProcState>>>> {
811 let names = self.proc_ids().collect::<Vec<ProcId>>();
812 if let Some(host_mesh) = &self.host_mesh {
813 Ok(Some(
814 host_mesh
815 .proc_states(cx, names, self.region.clone())
816 .await?,
817 ))
818 } else {
819 Ok(None)
820 }
821 }
822
823 pub(crate) fn proc_ids(&self) -> impl Iterator<Item = ProcId> {
825 self.ranks.iter().map(|proc_ref| proc_ref.proc_id.clone())
826 }
827
828 pub async fn spawn<A: Actor + Referable>(
838 &self,
839 cx: &impl context::Actor,
840 name: &str,
841 params: &A::Params,
842 ) -> v1::Result<ActorMesh<A>>
843 where
844 A::Params: RemoteMessage,
845 {
846 self.spawn_with_name(cx, Name::new(name), params).await
847 }
848
849 pub async fn spawn_service<A: Actor + Referable>(
857 &self,
858 cx: &impl context::Actor,
859 name: &str,
860 params: &A::Params,
861 ) -> v1::Result<ActorMesh<A>>
862 where
863 A::Params: RemoteMessage,
864 {
865 self.spawn_with_name(cx, Name::new_reserved(name), params)
866 .await
867 }
868
869 #[hyperactor::instrument(fields(
883 host_mesh=self.host_mesh_name().map(|n| n.to_string()),
884 proc_mesh=self.name.to_string(),
885 actor_name=name.to_string(),
886 ))]
887 pub(crate) async fn spawn_with_name<A: Actor + Referable>(
888 &self,
889 cx: &impl context::Actor,
890 name: Name,
891 params: &A::Params,
892 ) -> v1::Result<ActorMesh<A>>
893 where
894 A::Params: RemoteMessage,
895 {
896 tracing::info!(
897 name = "ProcMeshStatus",
898 status = "ActorMesh::Spawn::Attempt",
899 );
900 tracing::info!(name = "ActorMeshStatus", status = "Spawn::Attempt");
901 let result = self.spawn_with_name_inner(cx, name, params).await;
902 match &result {
903 Ok(_) => {
904 tracing::info!(
905 name = "ProcMeshStatus",
906 status = "ActorMesh::Spawn::Success",
907 );
908 tracing::info!(name = "ActorMeshStatus", status = "Spawn::Success");
909 }
910 Err(error) => {
911 tracing::error!(name = "ProcMeshStatus", status = "ActorMesh::Spawn::Failed", %error);
912 tracing::error!(name = "ActorMeshStatus", status = "Spawn::Failed", %error);
913 }
914 }
915 result
916 }
917
918 async fn spawn_with_name_inner<A: Actor + Referable>(
919 &self,
920 cx: &impl context::Actor,
921 name: Name,
922 params: &A::Params,
923 ) -> v1::Result<ActorMesh<A>>
924 where
925 A::Params: RemoteMessage,
926 {
927 let remote = Remote::collect();
928 let actor_type = remote
931 .name_of::<A>()
932 .ok_or(Error::ActorTypeNotRegistered(type_name::<A>().to_string()))?
933 .to_string();
934
935 let serialized_params = bincode::serialize(params)?;
936 let agent_mesh = self.agent_mesh();
937
938 agent_mesh.cast(
939 cx,
940 resource::CreateOrUpdate::<mesh_agent::ActorSpec> {
941 name: name.clone(),
942 rank: Default::default(),
943 spec: mesh_agent::ActorSpec {
944 actor_type: actor_type.clone(),
945 params_data: serialized_params.clone(),
946 },
947 },
948 )?;
949
950 let region = self.region().clone();
951 let (port, rx) = cx.mailbox().open_accum_port_opts(
961 crate::v1::StatusMesh::from_single(region.clone(), Status::NotExist),
964 Some(ReducerOpts {
965 max_update_interval: Some(Duration::from_millis(50)),
966 }),
967 );
968
969 let mut reply = port.bind();
970 reply.return_undeliverable(false);
973 agent_mesh.cast(
976 cx,
977 resource::GetRankStatus {
978 name: name.clone(),
979 reply,
980 },
981 )?;
982
983 let start_time = RealClock.now();
984
985 let mesh = match GetRankStatus::wait(
994 rx,
995 self.ranks.len(),
996 config::global::get(ACTOR_SPAWN_MAX_IDLE),
997 region.clone(), )
999 .await
1000 {
1001 Ok(statuses) => {
1002 let has_terminating = statuses.values().any(|s| s.is_terminating());
1006 if !has_terminating {
1007 Ok(ActorMesh::new(self.clone(), name))
1008 } else {
1009 let legacy = mesh_to_rankedvalues_with_default(
1010 &statuses,
1011 Status::NotExist,
1012 Status::is_not_exist,
1013 self.ranks.len(),
1014 );
1015 Err(Error::ActorSpawnError { statuses: legacy })
1016 }
1017 }
1018 Err(complete) => {
1019 let elapsed = start_time.elapsed();
1022 let legacy = mesh_to_rankedvalues_with_default(
1023 &complete,
1024 Status::Timeout(elapsed),
1025 Status::is_not_exist,
1026 self.ranks.len(),
1027 );
1028 Err(Error::ActorSpawnError { statuses: legacy })
1029 }
1030 }?;
1031 let _controller: ActorHandle<ActorMeshController<A>> =
1034 ActorMeshController::<A>::spawn(cx, mesh.deref().clone())
1035 .await
1036 .map_err(|e| Error::ControllerActorSpawnError(mesh.name().clone(), e))?;
1037 Ok(mesh)
1038 }
1039
1040 #[hyperactor::instrument(fields(
1042 host_mesh = self.host_mesh_name().map(|n| n.to_string()),
1043 proc_mesh = self.name.to_string(),
1044 actor_mesh = mesh_name.to_string(),
1045 ))]
1046 pub(crate) async fn stop_actor_by_name(
1047 &self,
1048 cx: &impl context::Actor,
1049 mesh_name: Name,
1050 ) -> v1::Result<()> {
1051 tracing::info!(name = "ProcMeshStatus", status = "ActorMesh::Stop::Attempt");
1052 tracing::info!(name = "ActorMeshStatus", status = "Stop::Attempt");
1053 let result = self.stop_actor_by_name_inner(cx, mesh_name).await;
1054 match &result {
1055 Ok(_) => {
1056 tracing::info!(name = "ProcMeshStatus", status = "ActorMesh::Stop::Success");
1057 tracing::info!(name = "ActorMeshStatus", status = "Stop::Success");
1058 }
1059 Err(error) => {
1060 tracing::error!(name = "ProcMeshStatus", status = "ActorMesh::Stop::Failed", %error);
1061 tracing::error!(name = "ActorMeshStatus", status = "Stop::Failed", %error);
1062 }
1063 }
1064 result
1065 }
1066
1067 async fn stop_actor_by_name_inner(
1068 &self,
1069 cx: &impl context::Actor,
1070 mesh_name: Name,
1071 ) -> v1::Result<()> {
1072 let region = self.region().clone();
1073 let agent_mesh = self.agent_mesh();
1074 agent_mesh.cast(
1075 cx,
1076 resource::Stop {
1077 name: mesh_name.clone(),
1078 },
1079 )?;
1080
1081 let (port, rx) = cx.mailbox().open_accum_port_opts(
1091 crate::v1::StatusMesh::from_single(region.clone(), Status::NotExist),
1094 Some(ReducerOpts {
1095 max_update_interval: Some(Duration::from_millis(50)),
1096 }),
1097 );
1098 agent_mesh.cast(
1099 cx,
1100 resource::GetRankStatus {
1101 name: mesh_name,
1102 reply: port.bind(),
1103 },
1104 )?;
1105 let start_time = RealClock.now();
1106
1107 let max_idle_time = config::global::get(ACTOR_SPAWN_MAX_IDLE);
1109 match GetRankStatus::wait(
1110 rx,
1111 self.ranks.len(),
1112 max_idle_time,
1113 region.clone(), )
1115 .await
1116 {
1117 Ok(statuses) => {
1118 let all_stopped = statuses.values().all(|s| s.is_terminating());
1122 if all_stopped {
1123 Ok(())
1124 } else {
1125 let legacy = mesh_to_rankedvalues_with_default(
1126 &statuses,
1127 Status::NotExist,
1128 Status::is_not_exist,
1129 self.ranks.len(),
1130 );
1131 Err(Error::ActorStopError { statuses: legacy })
1132 }
1133 }
1134 Err(complete) => {
1135 let legacy = mesh_to_rankedvalues_with_default(
1138 &complete,
1139 Status::Timeout(start_time.elapsed()),
1140 Status::is_not_exist,
1141 self.ranks.len(),
1142 );
1143 Err(Error::ActorStopError { statuses: legacy })
1144 }
1145 }
1146 }
1147}
1148
1149impl fmt::Display for ProcMeshRef {
1150 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1151 write!(f, "{}{{{}}}", self.name, self.region)
1152 }
1153}
1154
1155impl view::Ranked for ProcMeshRef {
1156 type Item = ProcRef;
1157
1158 fn region(&self) -> &Region {
1159 &self.region
1160 }
1161
1162 fn get(&self, rank: usize) -> Option<&Self::Item> {
1163 self.ranks.get(rank)
1164 }
1165}
1166
1167impl view::RankedSliceable for ProcMeshRef {
1168 fn sliced(&self, region: Region) -> Self {
1169 debug_assert!(region.is_subset(view::Ranked::region(self)));
1170 let ranks = self
1171 .region()
1172 .remap(®ion)
1173 .unwrap()
1174 .map(|index| self.get(index).unwrap().clone())
1175 .collect();
1176 Self::new(
1177 self.name.clone(),
1178 region,
1179 Arc::new(ranks),
1180 self.host_mesh.clone(),
1181 Some(self.root_region.as_ref().unwrap_or(&self.region).clone()),
1182 self.root_comm_actor.clone(),
1183 )
1184 .unwrap()
1185 }
1186}
1187
1188#[cfg(test)]
1189mod tests {
1190 use ndslice::ViewExt;
1191 use ndslice::extent;
1192 use timed_test::async_timed_test;
1193
1194 use crate::resource::RankedValues;
1195 use crate::resource::Status;
1196 use crate::v1::testactor;
1197 use crate::v1::testing;
1198
1199 #[tokio::test]
1200 async fn test_proc_mesh_allocate() {
1201 let (mesh, actor, router) = testing::local_proc_mesh(extent!(replica = 4)).await;
1202 assert_eq!(mesh.extent(), extent!(replica = 4));
1203 assert_eq!(mesh.ranks.len(), 4);
1204 assert!(!router.prefixes().is_empty());
1205
1206 for proc_ref in mesh.values() {
1208 assert!(proc_ref.status(&actor).await.unwrap());
1209 }
1210
1211 assert!(
1213 mesh.status(&actor)
1214 .await
1215 .unwrap()
1216 .values()
1217 .all(|status| status)
1218 );
1219 }
1220
1221 #[async_timed_test(timeout_secs = 30)]
1222 #[cfg(fbcode_build)]
1223 async fn test_spawn_actor() {
1224 hyperactor_telemetry::initialize_logging(hyperactor::clock::ClockKind::default());
1225
1226 let instance = testing::instance().await;
1227
1228 for proc_mesh in testing::proc_meshes(&instance, extent!(replicas = 4, hosts = 2)).await {
1229 testactor::assert_mesh_shape(proc_mesh.spawn(instance, "test", &()).await.unwrap())
1230 .await;
1231 }
1232 }
1233
1234 #[tokio::test]
1235 #[cfg(fbcode_build)]
1236 async fn test_failing_spawn_actor() {
1237 hyperactor_telemetry::initialize_logging(hyperactor::clock::ClockKind::default());
1238
1239 let instance = testing::instance().await;
1240
1241 for proc_mesh in testing::proc_meshes(&instance, extent!(replicas = 4, hosts = 2)).await {
1242 let err = proc_mesh
1243 .spawn::<testactor::FailingCreateTestActor>(instance, "testfail", &())
1244 .await
1245 .unwrap_err();
1246 let statuses = err.into_actor_spawn_error().unwrap();
1247 assert_eq!(
1248 statuses,
1249 RankedValues::from((0..8, Status::Failed("test failure".to_string()))),
1250 );
1251 }
1252 }
1253}