hyperactor_mesh/
alloc.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! This module defines a proc allocator interface as well as a multi-process
10//! (local) allocator, [`ProcessAllocator`].
11
12pub mod local;
13pub mod process;
14pub mod remoteprocess;
15pub mod sim;
16
17use std::collections::HashMap;
18use std::fmt;
19
20use async_trait::async_trait;
21use enum_as_inner::EnumAsInner;
22use hyperactor::ActorRef;
23use hyperactor::ProcId;
24use hyperactor::WorldId;
25use hyperactor::channel::ChannelAddr;
26use hyperactor::channel::ChannelTransport;
27use hyperactor::channel::MetaTlsAddr;
28use hyperactor_config::CONFIG;
29use hyperactor_config::ConfigAttr;
30use hyperactor_config::attrs::declare_attrs;
31pub use local::LocalAlloc;
32pub use local::LocalAllocator;
33use mockall::predicate::*;
34use mockall::*;
35use ndslice::Shape;
36use ndslice::Slice;
37use ndslice::view::Extent;
38use ndslice::view::Point;
39pub use process::ProcessAlloc;
40pub use process::ProcessAllocator;
41use serde::Deserialize;
42use serde::Serialize;
43use strum::AsRefStr;
44use typeuri::Named;
45
46use crate::alloc::test_utils::MockAllocWrapper;
47use crate::assign::Ranks;
48use crate::proc_mesh::mesh_agent::ProcMeshAgent;
49use crate::shortuuid::ShortUuid;
50
51/// Errors that occur during allocation operations.
52#[derive(Debug, thiserror::Error)]
53pub enum AllocatorError {
54    #[error("incomplete allocation; expected: {0}")]
55    Incomplete(Extent),
56
57    /// The requested shape is too large for the allocator.
58    #[error("not enough resources; requested: {requested}, available: {available}")]
59    NotEnoughResources { requested: Extent, available: usize },
60
61    /// An uncategorized error from an underlying system.
62    #[error(transparent)]
63    Other(#[from] anyhow::Error),
64}
65
66/// Constraints on the allocation.
67#[derive(Debug, Clone, Serialize, Deserialize, Default)]
68pub struct AllocConstraints {
69    /// Aribitrary name/value pairs that are interpreted by individual
70    /// allocators to control allocation process.
71    pub match_labels: HashMap<String, String>,
72}
73
74/// Specifies how to interpret the extent dimensions for allocation.
75#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
76#[derive(Default)]
77pub enum ProcAllocationMode {
78    /// Proc-level allocation: splits extent to allocate multiple processes per host.
79    /// Requires at least 2 dimensions (e.g., [hosts: N, gpus: M]).
80    /// Splits by second-to-last dimension, creating N regions with M processes each.
81    /// Used by MastAllocator.
82    #[default]
83    ProcLevel,
84    /// Host-level allocation: each point in the extent is a host (no sub-host splitting).
85    /// For extent!(region = 2, host = 4), create 8 regions, each representing 1 host.
86    /// Used by MastHostAllocator.
87    HostLevel,
88}
89
90/// A specification (desired state) of an alloc.
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct AllocSpec {
93    /// The requested extent of the alloc.
94    // We currently assume that this shape is dense.
95    // This should be validated, or even enforced by
96    // way of types.
97    pub extent: Extent,
98
99    /// Constraints on the allocation.
100    pub constraints: AllocConstraints,
101
102    /// If specified, return procs using direct addressing with
103    /// the provided proc name.
104    pub proc_name: Option<String>,
105
106    /// The transport to use for the procs in this alloc.
107    pub transport: ChannelTransport,
108
109    /// Specifies how to interpret the extent dimensions for allocation.
110    /// Defaults to ProcLevel for backward compatibility.
111    #[serde(default = "default_proc_allocation_mode")]
112    pub proc_allocation_mode: ProcAllocationMode,
113}
114
115fn default_proc_allocation_mode() -> ProcAllocationMode {
116    ProcAllocationMode::ProcLevel
117}
118
119/// The core allocator trait, implemented by all allocators.
120#[automock(type Alloc=MockAllocWrapper;)]
121#[async_trait]
122pub trait Allocator {
123    /// The type of [`Alloc`] produced by this allocator.
124    type Alloc: Alloc;
125
126    /// Create a new allocation. The allocation itself is generally
127    /// returned immediately (after validating parameters, etc.);
128    /// the caller is expected to respond to allocation events as
129    /// the underlying procs are incrementally allocated.
130    async fn allocate(&mut self, spec: AllocSpec) -> Result<Self::Alloc, AllocatorError>;
131}
132
133/// A proc's status. A proc can only monotonically move from
134/// `Created` to `Running` to `Stopped`.
135#[derive(
136    Clone,
137    Debug,
138    PartialEq,
139    EnumAsInner,
140    Serialize,
141    Deserialize,
142    AsRefStr,
143    Named
144)]
145pub enum ProcState {
146    /// A proc was added to the alloc.
147    Created {
148        /// A key to uniquely identify a created proc. The key is used again
149        /// to identify the created proc as Running.
150        create_key: ShortUuid,
151        /// Its assigned point (in the alloc's extent).
152        point: Point,
153        /// The system process ID of the created child process.
154        pid: u32,
155    },
156    /// A proc was started.
157    Running {
158        /// The key used to identify the created proc.
159        create_key: ShortUuid,
160        /// The proc's assigned ID.
161        proc_id: ProcId,
162        /// Reference to this proc's mesh agent. In the future, we'll reserve a
163        /// 'well known' PID (0) for this purpose.
164        mesh_agent: ActorRef<ProcMeshAgent>,
165        /// The address of this proc. The endpoint of this address is
166        /// the proc's mailbox, which accepts [`hyperactor::mailbox::MessageEnvelope`]s.
167        addr: ChannelAddr,
168    },
169    /// A proc was stopped.
170    Stopped {
171        create_key: ShortUuid,
172        reason: ProcStopReason,
173    },
174    /// Allocation process encountered an irrecoverable error. Depending on the
175    /// implementation, the allocation process may continue transiently and calls
176    /// to next() may return some events. But eventually the allocation will not
177    /// be complete. Callers can use the `description` to determine the reason for
178    /// the failure.
179    /// Allocation can then be cleaned up by calling `stop()`` on the `Alloc` and
180    /// drain the iterator for clean shutdown.
181    Failed {
182        /// The world ID of the failed alloc.
183        ///
184        /// TODO: this is not meaningful with direct addressing.
185        world_id: WorldId,
186        /// A description of the failure.
187        description: String,
188    },
189}
190
191impl fmt::Display for ProcState {
192    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
193        match self {
194            ProcState::Created {
195                create_key,
196                point,
197                pid,
198            } => {
199                write!(f, "{}: created at ({}) with PID {}", create_key, point, pid)
200            }
201            ProcState::Running { proc_id, addr, .. } => {
202                write!(f, "{}: running at {}", proc_id, addr)
203            }
204            ProcState::Stopped { create_key, reason } => {
205                write!(f, "{}: stopped: {}", create_key, reason)
206            }
207            ProcState::Failed {
208                description,
209                world_id,
210            } => {
211                write!(f, "{}: failed: {}", world_id, description)
212            }
213        }
214    }
215}
216
217/// The reason a proc stopped.
218#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, EnumAsInner)]
219pub enum ProcStopReason {
220    /// The proc stopped gracefully, e.g., with exit code 0.
221    Stopped,
222    /// The proc exited with the provided error code and stderr
223    Exited(i32, String),
224    /// The proc was killed. The signal number is indicated;
225    /// the flags determines whether there was a core dump.
226    Killed(i32, bool),
227    /// The proc failed to respond to a watchdog request within a timeout.
228    Watchdog,
229    /// The host running the proc failed to respond to a watchdog request
230    /// within a timeout.
231    HostWatchdog,
232    /// The proc failed for an unknown reason.
233    Unknown,
234}
235
236impl fmt::Display for ProcStopReason {
237    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
238        match self {
239            Self::Stopped => write!(f, "stopped"),
240            Self::Exited(code, stderr) => {
241                if stderr.is_empty() {
242                    write!(f, "exited with code {}", code)
243                } else {
244                    write!(f, "exited with code {}: {}", code, stderr)
245                }
246            }
247            Self::Killed(signal, dumped) => {
248                write!(f, "killed with signal {} (core dumped={})", signal, dumped)
249            }
250            Self::Watchdog => write!(f, "proc watchdog failure"),
251            Self::HostWatchdog => write!(f, "host watchdog failure"),
252            Self::Unknown => write!(f, "unknown"),
253        }
254    }
255}
256
257/// An alloc is a specific allocation, returned by an [`Allocator`].
258#[automock]
259#[async_trait]
260pub trait Alloc {
261    /// Return the next proc event. `None` indicates that there are
262    /// no more events, and that the alloc is stopped.
263    async fn next(&mut self) -> Option<ProcState>;
264
265    /// The spec against which this alloc is executing.
266    fn spec(&self) -> &AllocSpec;
267
268    /// The shape of the alloc.
269    fn extent(&self) -> &Extent;
270
271    /// The shape of the alloc. (Deprecated.)
272    fn shape(&self) -> Shape {
273        let slice = Slice::new_row_major(self.extent().sizes());
274        Shape::new(self.extent().labels().to_vec(), slice).unwrap()
275    }
276
277    /// The world id of this alloc, uniquely identifying the alloc.
278    /// Note: This will be removed in favor of a different naming scheme,
279    /// once we exise "worlds" from hyperactor core.
280    fn world_id(&self) -> &WorldId;
281
282    /// The channel transport used the procs in this alloc.
283    fn transport(&self) -> ChannelTransport {
284        self.spec().transport.clone()
285    }
286
287    /// Stop this alloc, shutting down all of its procs. A clean
288    /// shutdown should result in Stop events from all allocs,
289    /// followed by the end of the event stream.
290    async fn stop(&mut self) -> Result<(), AllocatorError>;
291
292    /// Stop this alloc and wait for all procs to stop. Call will
293    /// block until all ProcState events have been drained.
294    async fn stop_and_wait(&mut self) -> Result<(), AllocatorError> {
295        tracing::error!(
296            name = "AllocStatus",
297            alloc_name = %self.world_id(),
298            status = "StopAndWait",
299        );
300        self.stop().await?;
301        while let Some(event) = self.next().await {
302            tracing::debug!(
303                alloc_name = %self.world_id(),
304                "drained event: {event:?}"
305            );
306        }
307        tracing::error!(
308            name = "AllocStatus",
309            alloc_name = %self.world_id(),
310            status = "Stopped",
311        );
312        Ok(())
313    }
314
315    /// Returns whether the alloc is a local alloc: that is, its procs are
316    /// not independent processes, but just threads in the selfsame process.
317    fn is_local(&self) -> bool {
318        false
319    }
320
321    /// The address that should be used to serve the client's router.
322    fn client_router_addr(&self) -> ChannelAddr {
323        ChannelAddr::any(self.transport())
324    }
325}
326
327#[derive(Debug, Clone, PartialEq, Eq, Hash)]
328pub(crate) struct AllocatedProc {
329    pub create_key: ShortUuid,
330    pub proc_id: ProcId,
331    pub addr: ChannelAddr,
332    pub mesh_agent: ActorRef<ProcMeshAgent>,
333}
334
335impl fmt::Display for AllocatedProc {
336    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
337        write!(
338            f,
339            "AllocatedProc {{ create_key: {}, proc_id: {}, addr: {}, mesh_agent: {} }}",
340            self.create_key, self.proc_id, self.addr, self.mesh_agent
341        )
342    }
343}
344
345#[async_trait]
346pub(crate) trait AllocExt {
347    /// Perform initial allocation, consuming events until the alloc is fully
348    /// running. Returns the ranked procs.
349    async fn initialize(&mut self) -> Result<Vec<AllocatedProc>, AllocatorError>;
350}
351
352#[async_trait]
353impl<A: ?Sized + Send + Alloc> AllocExt for A {
354    async fn initialize(&mut self) -> Result<Vec<AllocatedProc>, AllocatorError> {
355        // We wait for the full allocation to be running before returning the mesh.
356        let shape = self.shape().clone();
357
358        let mut created = Ranks::new(shape.slice().len());
359        let mut running = Ranks::new(shape.slice().len());
360
361        while !running.is_full() {
362            let Some(state) = self.next().await else {
363                // Alloc finished before it was fully allocated.
364                return Err(AllocatorError::Incomplete(self.extent().clone()));
365            };
366
367            let name = tracing::Span::current()
368                .metadata()
369                .map(|m| m.name())
370                .unwrap_or("initialize");
371            let status = format!("ProcState:{}", state.arm().unwrap_or("unknown"));
372
373            match state {
374                ProcState::Created {
375                    create_key, point, ..
376                } => {
377                    let rank = point.rank();
378                    if let Some(old_create_key) = created.insert(rank, create_key.clone()) {
379                        tracing::warn!(
380                            name,
381                            status,
382                            rank,
383                            "rank {rank} reassigned from {old_create_key} to {create_key}"
384                        );
385                    }
386                    tracing::info!(
387                        name,
388                        status,
389                        rank,
390                        "proc with create key {}, rank {}: created",
391                        create_key,
392                        rank
393                    );
394                }
395                ProcState::Running {
396                    create_key,
397                    proc_id,
398                    mesh_agent,
399                    addr,
400                } => {
401                    let Some(rank) = created.rank(&create_key) else {
402                        tracing::warn!(
403                            name,
404                            %proc_id,
405                            status,
406                            "proc id {proc_id} with create key {create_key} \
407                            is running, but was not created"
408                        );
409                        continue;
410                    };
411
412                    let allocated_proc = AllocatedProc {
413                        create_key,
414                        proc_id: proc_id.clone(),
415                        addr: addr.clone(),
416                        mesh_agent: mesh_agent.clone(),
417                    };
418                    if let Some(old_allocated_proc) = running.insert(*rank, allocated_proc.clone())
419                    {
420                        tracing::warn!(
421                            name,
422                            %proc_id,
423                            status,
424                            rank,
425                            "duplicate running notifications for {rank}: \
426                            old:{old_allocated_proc}; \
427                            new:{allocated_proc}"
428                        )
429                    }
430                    tracing::info!(
431                        name,
432                        %proc_id,
433                        status,
434                        "proc {} rank {}: running at addr:{addr} mesh_agent:{mesh_agent}",
435                        proc_id,
436                        rank
437                    );
438                }
439                // TODO: We should push responsibility to the allocator, which
440                // can choose to either provide a new proc or emit a
441                // ProcState::Failed to fail the whole allocation.
442                ProcState::Stopped { create_key, reason } => {
443                    tracing::error!(
444                        name,
445                        status,
446                        "allocation failed for proc with create key {}: {}",
447                        create_key,
448                        reason
449                    );
450                    return Err(AllocatorError::Other(anyhow::Error::msg(reason)));
451                }
452                ProcState::Failed {
453                    world_id,
454                    description,
455                } => {
456                    tracing::error!(
457                        name,
458                        status,
459                        "allocation failed for world {}: {}",
460                        world_id,
461                        description
462                    );
463                    return Err(AllocatorError::Other(anyhow::Error::msg(description)));
464                }
465            }
466        }
467
468        // We collect all the ranks at this point of completion, so that we can
469        // avoid holding Rcs across awaits.
470        Ok(running.into_iter().map(Option::unwrap).collect())
471    }
472}
473
474/// If addr is Tcp or Metatls, use its IP address or hostname to create
475/// a new addr with port unspecified.
476///
477/// for other types of addr, return "any" address.
478pub(crate) fn with_unspecified_port_or_any(addr: &ChannelAddr) -> ChannelAddr {
479    match addr {
480        ChannelAddr::Tcp(socket) => {
481            let mut new_socket = socket.clone();
482            new_socket.set_port(0);
483            ChannelAddr::Tcp(new_socket)
484        }
485        ChannelAddr::MetaTls(MetaTlsAddr::Socket(socket)) => {
486            let mut new_socket = socket.clone();
487            new_socket.set_port(0);
488            ChannelAddr::MetaTls(MetaTlsAddr::Socket(new_socket))
489        }
490        ChannelAddr::MetaTls(MetaTlsAddr::Host { hostname, port: _ }) => {
491            ChannelAddr::MetaTls(MetaTlsAddr::Host {
492                hostname: hostname.clone(),
493                port: 0,
494            })
495        }
496        _ => addr.transport().any(),
497    }
498}
499
500pub mod test_utils {
501    use std::time::Duration;
502
503    use hyperactor::Actor;
504    use hyperactor::Context;
505    use hyperactor::Handler;
506    use libc::atexit;
507    use tokio::sync::broadcast::Receiver;
508    use tokio::sync::broadcast::Sender;
509    use typeuri::Named;
510
511    use super::*;
512
513    extern "C" fn exit_handler() {
514        loop {
515            #[allow(clippy::disallowed_methods)]
516            std::thread::sleep(Duration::from_mins(1));
517        }
518    }
519
520    // This can't be defined under a `#[cfg(test)]` because there needs to
521    // be an entry in the spawnable actor registry in the executable
522    // 'hyperactor_mesh_test_bootstrap' for the `tests::process` actor
523    // mesh test suite.
524    #[derive(Debug, Default)]
525    #[hyperactor::export(
526        spawn = true,
527        handlers = [
528            Wait
529        ],
530    )]
531    pub struct TestActor;
532
533    impl Actor for TestActor {}
534
535    #[derive(Debug, Serialize, Deserialize, Named, Clone)]
536    pub struct Wait;
537
538    #[async_trait]
539    impl Handler<Wait> for TestActor {
540        async fn handle(&mut self, _: &Context<Self>, _: Wait) -> Result<(), anyhow::Error> {
541            // SAFETY:
542            // This is in order to simulate a process in tests that never exits.
543            unsafe {
544                atexit(exit_handler);
545            }
546            Ok(())
547        }
548    }
549
550    /// Test wrapper around MockAlloc to allow us to block next() calls since
551    /// mockall doesn't support returning futures.
552    pub struct MockAllocWrapper {
553        pub alloc: MockAlloc,
554        pub block_next_after: usize,
555        notify_tx: Sender<()>,
556        notify_rx: Receiver<()>,
557        next_unblocked: bool,
558    }
559
560    impl MockAllocWrapper {
561        pub fn new(alloc: MockAlloc) -> Self {
562            Self::new_block_next(alloc, usize::MAX)
563        }
564
565        pub fn new_block_next(alloc: MockAlloc, count: usize) -> Self {
566            let (tx, rx) = tokio::sync::broadcast::channel(1);
567            Self {
568                alloc,
569                block_next_after: count,
570                notify_tx: tx,
571                notify_rx: rx,
572                next_unblocked: false,
573            }
574        }
575
576        pub fn notify_tx(&self) -> Sender<()> {
577            self.notify_tx.clone()
578        }
579    }
580
581    #[async_trait]
582    impl Alloc for MockAllocWrapper {
583        async fn next(&mut self) -> Option<ProcState> {
584            match self.block_next_after {
585                0 => {
586                    if !self.next_unblocked {
587                        self.notify_rx.recv().await.unwrap();
588                        self.next_unblocked = true;
589                    }
590                }
591                1.. => {
592                    self.block_next_after -= 1;
593                }
594            }
595
596            self.alloc.next().await
597        }
598
599        fn spec(&self) -> &AllocSpec {
600            self.alloc.spec()
601        }
602
603        fn extent(&self) -> &Extent {
604            self.alloc.extent()
605        }
606
607        fn world_id(&self) -> &WorldId {
608            self.alloc.world_id()
609        }
610
611        async fn stop(&mut self) -> Result<(), AllocatorError> {
612            self.alloc.stop().await
613        }
614    }
615}
616
617#[cfg(test)]
618pub(crate) mod testing {
619    use core::panic;
620    use std::collections::HashMap;
621    use std::collections::HashSet;
622    use std::time::Duration;
623
624    use hyperactor::Instance;
625    use hyperactor::actor::remote::Remote;
626    use hyperactor::channel;
627    use hyperactor::context;
628    use hyperactor::mailbox;
629    use hyperactor::mailbox::BoxedMailboxSender;
630    use hyperactor::mailbox::DialMailboxRouter;
631    use hyperactor::mailbox::IntoBoxedMailboxSender;
632    use hyperactor::mailbox::MailboxServer;
633    use hyperactor::mailbox::UndeliverableMailboxSender;
634    use hyperactor::proc::Proc;
635    use hyperactor::reference::Reference;
636    use ndslice::extent;
637    use tokio::process::Command;
638
639    use super::*;
640    use crate::alloc::test_utils::TestActor;
641    use crate::alloc::test_utils::Wait;
642    use crate::proc_mesh::default_transport;
643    use crate::proc_mesh::mesh_agent::GspawnResult;
644    use crate::proc_mesh::mesh_agent::MeshAgentMessageClient;
645
646    #[macro_export]
647    macro_rules! alloc_test_suite {
648        ($allocator:expr) => {
649            #[tokio::test]
650            async fn test_allocator_basic() {
651                $crate::alloc::testing::test_allocator_basic($allocator).await;
652            }
653        };
654    }
655
656    pub(crate) async fn test_allocator_basic(mut allocator: impl Allocator) {
657        let extent = extent!(replica = 4);
658        let mut alloc = allocator
659            .allocate(AllocSpec {
660                extent: extent.clone(),
661                constraints: Default::default(),
662                proc_name: None,
663                transport: default_transport(),
664                proc_allocation_mode: Default::default(),
665            })
666            .await
667            .unwrap();
668
669        // Get everything up into running state. We require that we get
670        // procs 0..4.
671        let mut procs = HashMap::new();
672        let mut created = HashMap::new();
673        let mut running = HashSet::new();
674        while running.len() != 4 {
675            match alloc.next().await.unwrap() {
676                ProcState::Created {
677                    create_key, point, ..
678                } => {
679                    created.insert(create_key, point);
680                }
681                ProcState::Running {
682                    create_key,
683                    proc_id,
684                    ..
685                } => {
686                    assert!(running.insert(create_key.clone()));
687                    procs.insert(proc_id, created.remove(&create_key).unwrap());
688                }
689                event => panic!("unexpected event: {:?}", event),
690            }
691        }
692
693        // We should have complete coverage of all points.
694        let points: HashSet<_> = procs.values().collect();
695        for x in 0..4 {
696            assert!(points.contains(&extent.point(vec![x]).unwrap()));
697        }
698
699        // Every proc should belong to the same "world" (alloc).
700        let worlds: HashSet<_> = procs.keys().map(|proc_id| proc_id.world_id()).collect();
701        assert_eq!(worlds.len(), 1);
702
703        // Now, stop the alloc and make sure it shuts down cleanly.
704
705        alloc.stop().await.unwrap();
706        let mut stopped = HashSet::new();
707        while let Some(ProcState::Stopped {
708            create_key, reason, ..
709        }) = alloc.next().await
710        {
711            assert_eq!(reason, ProcStopReason::Stopped);
712            stopped.insert(create_key);
713        }
714        assert!(alloc.next().await.is_none());
715        assert_eq!(stopped, running);
716    }
717
718    async fn spawn_proc(
719        transport: ChannelTransport,
720    ) -> (DialMailboxRouter, Instance<()>, Proc, ChannelAddr) {
721        let (router_channel_addr, router_rx) =
722            channel::serve(ChannelAddr::any(transport.clone())).unwrap();
723        let router =
724            DialMailboxRouter::new_with_default((UndeliverableMailboxSender {}).into_boxed());
725        router.clone().serve(router_rx);
726
727        let client_proc_id = ProcId::Ranked(WorldId("test_stuck".to_string()), 0);
728        let (client_proc_addr, client_rx) = channel::serve(ChannelAddr::any(transport)).unwrap();
729        let client_proc = Proc::new(
730            client_proc_id.clone(),
731            BoxedMailboxSender::new(router.clone()),
732        );
733        client_proc.clone().serve(client_rx);
734        router.bind(client_proc_id.clone().into(), client_proc_addr);
735        (
736            router,
737            client_proc.instance("test_proc").unwrap().0,
738            client_proc,
739            router_channel_addr,
740        )
741    }
742
743    async fn spawn_test_actor(
744        rank: usize,
745        client_proc: &Proc,
746        cx: &impl context::Actor,
747        router_channel_addr: ChannelAddr,
748        mesh_agent: ActorRef<ProcMeshAgent>,
749    ) -> ActorRef<TestActor> {
750        let (supervisor, _supervisor_handle) = client_proc.instance("supervisor").unwrap();
751        let (supervison_port, _) = supervisor.open_port();
752        let (config_handle, _) = cx.mailbox().open_port();
753        mesh_agent
754            .configure(
755                cx,
756                rank,
757                router_channel_addr,
758                Some(supervison_port.bind()),
759                HashMap::new(),
760                config_handle.bind(),
761                false,
762            )
763            .await
764            .unwrap();
765        let remote = Remote::collect();
766        let actor_type = remote
767            .name_of::<TestActor>()
768            .ok_or(anyhow::anyhow!("actor not registered"))
769            .unwrap()
770            .to_string();
771        let params = &();
772        let (completed_handle, mut completed_receiver) = mailbox::open_port(cx);
773        // gspawn actor
774        mesh_agent
775            .gspawn(
776                cx,
777                actor_type,
778                "Stuck".to_string(),
779                bincode::serialize(params).unwrap(),
780                completed_handle.bind(),
781            )
782            .await
783            .unwrap();
784        let result = completed_receiver.recv().await.unwrap();
785        match result {
786            GspawnResult::Success { actor_id, .. } => ActorRef::attest(actor_id),
787            GspawnResult::Error(error_msg) => {
788                panic!("gspawn failed: {}", error_msg);
789            }
790        }
791    }
792
793    /// In order to simulate stuckness, we have to do two things:
794    /// An actor that is blocked forever AND
795    /// a proc that does not time out when it is asked to wait for
796    /// a stuck actor.
797    #[tokio::test]
798    #[cfg(fbcode_build)]
799    async fn test_allocator_stuck_task() {
800        // Override config.
801        // Use temporary config for this test
802        let config = hyperactor_config::global::lock();
803        let _guard = config.override_key(
804            hyperactor::config::PROCESS_EXIT_TIMEOUT,
805            Duration::from_secs(1),
806        );
807
808        let command = Command::new(crate::testresource::get(
809            "monarch/hyperactor_mesh/bootstrap",
810        ));
811        let mut allocator = ProcessAllocator::new(command);
812        let mut alloc = allocator
813            .allocate(AllocSpec {
814                extent: extent! { replica = 1 },
815                constraints: Default::default(),
816                proc_name: None,
817                transport: ChannelTransport::Unix,
818                proc_allocation_mode: Default::default(),
819            })
820            .await
821            .unwrap();
822
823        // Get everything up into running state. We require that we get
824        let mut procs = HashMap::new();
825        let mut running = HashSet::new();
826        let mut actor_ref = None;
827        let (router, client, client_proc, router_addr) = spawn_proc(alloc.transport()).await;
828        while running.is_empty() {
829            match alloc.next().await.unwrap() {
830                ProcState::Created {
831                    create_key, point, ..
832                } => {
833                    procs.insert(create_key, point);
834                }
835                ProcState::Running {
836                    create_key,
837                    proc_id,
838                    mesh_agent,
839                    addr,
840                } => {
841                    router.bind(Reference::Proc(proc_id.clone()), addr.clone());
842
843                    assert!(procs.contains_key(&create_key));
844                    assert!(!running.contains(&create_key));
845
846                    actor_ref = Some(
847                        spawn_test_actor(0, &client_proc, &client, router_addr, mesh_agent).await,
848                    );
849                    running.insert(create_key.clone());
850                    break;
851                }
852                event => panic!("unexpected event: {:?}", event),
853            }
854        }
855        assert!(actor_ref.unwrap().send(&client, Wait).is_ok());
856
857        // There is a stuck actor! We should get a watchdog failure.
858        alloc.stop().await.unwrap();
859        let mut stopped = HashSet::new();
860        while let Some(ProcState::Stopped {
861            create_key, reason, ..
862        }) = alloc.next().await
863        {
864            assert_eq!(reason, ProcStopReason::Watchdog);
865            stopped.insert(create_key);
866        }
867        assert!(alloc.next().await.is_none());
868        assert_eq!(stopped, running);
869    }
870}