Skip to main content

monarch_hyperactor/
proc.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::hash::DefaultHasher;
10use std::hash::Hash;
11use std::hash::Hasher;
12use std::time::Duration;
13
14use anyhow::Result;
15use hyperactor::RemoteMessage;
16use hyperactor::actor::Signal;
17use hyperactor::channel::ChannelAddr;
18use hyperactor::mailbox::PortReceiver;
19use hyperactor::proc::Instance;
20use hyperactor::proc::Proc;
21use monarch_types::PickledPyObject;
22use pyo3::exceptions::PyRuntimeError;
23use pyo3::exceptions::PyValueError;
24use pyo3::prelude::*;
25use pyo3::types::PyList;
26use pyo3::types::PyType;
27
28use crate::actor::PythonActor;
29use crate::actor::PythonActorHandle;
30use crate::runtime::signal_safe_block_on;
31
32/// Wrapper around a proc that provides utilities to implement a python actor.
33#[derive(Clone, Debug)]
34#[pyclass(
35    name = "Proc",
36    module = "monarch._rust_bindings.monarch_hyperactor.proc"
37)]
38pub struct PyProc {
39    pub(super) inner: Proc,
40}
41
42#[pymethods]
43impl PyProc {
44    #[new]
45    #[pyo3(signature = ())]
46    fn new() -> PyResult<Self> {
47        Ok(Self {
48            inner: Proc::isolated(),
49        })
50    }
51
52    #[getter]
53    fn addr(&self) -> String {
54        self.inner.proc_addr().addr().to_string()
55    }
56
57    #[getter]
58    fn name(&self) -> String {
59        self.inner
60            .proc_addr()
61            .label()
62            .map(|l: &hyperactor::id::Label| l.as_str().to_string())
63            .unwrap_or_else(|| self.inner.proc_addr().id().to_string())
64    }
65
66    #[getter]
67    fn id(&self) -> String {
68        self.inner.proc_addr().to_string()
69    }
70
71    fn destroy<'py>(
72        &mut self,
73        timeout_in_secs: u64,
74        py: Python<'py>,
75    ) -> PyResult<Bound<'py, PyList>> {
76        let mut inner = self.inner.clone();
77        let (_stopped, aborted) = signal_safe_block_on(py, async move {
78            inner
79                .destroy_and_wait(Duration::from_secs(timeout_in_secs), "destroy")
80                .await
81                .map_err(|e| PyRuntimeError::new_err(e.to_string()))
82        })??;
83        let aborted_actors = aborted
84            .into_iter()
85            .map(|actor_id| format!("{}", actor_id))
86            .collect::<Vec<_>>();
87        // TODO: i don't think returning this list is of much use for
88        // anything?
89        PyList::new(py, aborted_actors)
90    }
91
92    #[pyo3(signature = (actor, name=None))]
93    fn spawn<'py>(
94        &self,
95        py: Python<'py>,
96        actor: &Bound<'py, PyType>,
97        name: Option<String>,
98    ) -> PyResult<Bound<'py, PyAny>> {
99        let proc = self.inner.clone();
100        let pickled_type = PickledPyObject::pickle(actor.as_any())?;
101        crate::runtime::future_into_py(py, async move {
102            Ok(PythonActorHandle {
103                inner: proc.spawn(
104                    name.as_deref().unwrap_or("anon"),
105                    PythonActor::new(pickled_type, None, None, None)?,
106                )?,
107            })
108        })
109    }
110
111    #[pyo3(signature = (actor, name=None))]
112    fn spawn_blocking<'py>(
113        &self,
114        py: Python<'py>,
115        actor: &Bound<'py, PyType>,
116        name: Option<String>,
117    ) -> PyResult<PythonActorHandle> {
118        let proc = self.inner.clone();
119        let pickled_type = PickledPyObject::pickle(actor.as_any())?;
120        Ok(PythonActorHandle {
121            inner: signal_safe_block_on(py, async move {
122                proc.spawn(
123                    name.as_deref().unwrap_or("anon"),
124                    PythonActor::new(pickled_type, None, None, None)?,
125                )
126            })
127            .map_err(|e| PyRuntimeError::new_err(e.to_string()))??,
128        })
129    }
130}
131
132impl PyProc {
133    pub fn new_from_proc(proc: Proc) -> Self {
134        Self { inner: proc }
135    }
136}
137
138#[pyclass(
139    frozen,
140    name = "ActorAddr",
141    module = "monarch._rust_bindings.monarch_hyperactor.proc"
142)]
143#[derive(Clone)]
144pub struct PyActorAddr {
145    pub(super) inner: hyperactor::ActorAddr,
146}
147
148impl From<hyperactor::ActorAddr> for PyActorAddr {
149    fn from(actor_id: hyperactor::ActorAddr) -> Self {
150        Self { inner: actor_id }
151    }
152}
153
154impl From<PyActorAddr> for hyperactor::ActorAddr {
155    fn from(val: PyActorAddr) -> Self {
156        val.inner
157    }
158}
159
160#[pymethods]
161impl PyActorAddr {
162    #[new]
163    #[pyo3(signature = (*, addr, proc_name, actor_name))]
164    fn new(addr: &str, proc_name: &str, actor_name: &str) -> PyResult<Self> {
165        let addr: ChannelAddr = addr.parse().map_err(|e| {
166            PyValueError::new_err(format!("Failed to parse channel address '{}': {}", addr, e))
167        })?;
168        Ok(Self {
169            inner: hyperactor::ProcAddr::singleton(addr, proc_name).actor_addr(actor_name),
170        })
171    }
172
173    #[staticmethod]
174    fn from_string(actor_id: &str) -> PyResult<Self> {
175        Ok(Self {
176            inner: actor_id.parse().map_err(|e| {
177                PyValueError::new_err(format!(
178                    "Failed to extract actor id from {}: {}",
179                    actor_id, e
180                ))
181            })?,
182        })
183    }
184
185    #[getter]
186    fn addr(&self) -> String {
187        self.inner.proc_addr().addr().to_string()
188    }
189
190    #[getter]
191    fn proc_name(&self) -> String {
192        self.inner
193            .proc_addr()
194            .label()
195            .map(|l: &hyperactor::id::Label| l.as_str().to_string())
196            .unwrap_or_else(|| self.inner.proc_addr().id().to_string())
197    }
198
199    #[getter]
200    fn actor_name(&self) -> String {
201        self.inner
202            .label()
203            .map(|l: &hyperactor::id::Label| l.as_str().to_string())
204            .unwrap_or_else(|| self.inner.uid().to_string())
205    }
206
207    #[getter]
208    fn label(&self) -> Option<String> {
209        self.inner
210            .label()
211            .map(|l: &hyperactor::id::Label| l.as_str().to_string())
212    }
213
214    #[getter]
215    fn proc_label(&self) -> Option<String> {
216        self.inner
217            .proc_addr()
218            .label()
219            .map(|l: &hyperactor::id::Label| l.as_str().to_string())
220    }
221
222    #[getter]
223    fn uid(&self) -> String {
224        self.inner.uid().to_string()
225    }
226
227    #[getter]
228    fn pid(&self) -> String {
229        self.uid()
230    }
231
232    #[getter]
233    fn proc_id(&self) -> String {
234        self.inner.proc_addr().to_string()
235    }
236
237    #[getter]
238    fn is_root(&self) -> bool {
239        self.inner.is_root()
240    }
241
242    fn __str__(&self) -> String {
243        self.inner.to_string()
244    }
245
246    fn __hash__(&self) -> u64 {
247        let mut hasher = DefaultHasher::new();
248        self.inner.to_string().hash(&mut hasher);
249        hasher.finish()
250    }
251
252    fn __eq__(&self, other: &Bound<'_, PyAny>) -> PyResult<bool> {
253        if let Ok(other) = other.extract::<PyActorAddr>() {
254            Ok(self.inner == other.inner)
255        } else {
256            Ok(false)
257        }
258    }
259
260    fn __reduce__<'py>(slf: &Bound<'py, Self>) -> PyResult<(Bound<'py, PyAny>, (String,))> {
261        Ok((slf.getattr("from_string")?, (slf.borrow().__str__(),)))
262    }
263}
264
265impl From<&PyActorAddr> for hyperactor::ActorAddr {
266    fn from(actor_id: &PyActorAddr) -> Self {
267        actor_id.inner.clone()
268    }
269}
270
271impl std::fmt::Debug for PyActorAddr {
272    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
273        self.inner.fmt(f)
274    }
275}
276
277#[derive(Clone, Copy, PartialEq, Eq)]
278enum InstanceStatus {
279    Running,
280    Stopped,
281}
282
283/// Wrapper around a [`Any`] that allows returning it to python and
284/// passed to python based detached actors to send to other actors.
285#[pyclass(
286    frozen,
287    name = "Serialized",
288    module = "monarch._rust_bindings.monarch_hyperactor.proc"
289)]
290#[derive(Debug)]
291pub struct PySerialized {
292    inner: wirevalue::Any,
293    /// The handler port for this message type.
294    port: u64,
295}
296
297impl PySerialized {
298    pub fn new<M: RemoteMessage>(message: &M) -> PyResult<Self> {
299        Ok(Self {
300            inner: wirevalue::Any::serialize(message).map_err(|err| {
301                PyRuntimeError::new_err(format!(
302                    "failed to serialize message of type {} to Any: {}",
303                    std::any::type_name::<M>(),
304                    err
305                ))
306            })?,
307            port: M::port(),
308        })
309    }
310
311    pub fn deserialized<M: RemoteMessage>(&self) -> PyResult<M> {
312        self.inner.deserialized().map_err(|err| {
313            PyRuntimeError::new_err(format!("failed to deserialize message: {}", err))
314        })
315    }
316
317    /// The handler port for this message type.
318    pub fn port(&self) -> u64 {
319        self.port
320    }
321}
322
323/// Wrapper around an instance of an actor that provides utilities to implement
324/// a python actor. This helps by allowing users to specialize the actor to the
325/// message type they want to handle.
326pub struct InstanceWrapper<M: RemoteMessage> {
327    instance: Instance<()>,
328    message_receiver: PortReceiver<M>,
329    signal_receiver: PortReceiver<Signal>,
330    status: InstanceStatus,
331    actor_id: hyperactor::ActorAddr,
332}
333
334impl<M: RemoteMessage> InstanceWrapper<M> {
335    pub fn new(proc: &PyProc, actor_name: &str) -> Result<Self> {
336        let instance = proc.inner.client(actor_name)?.0;
337        // TEMPORARY: remove after using fixed handler ports.
338        let (_handler_port, message_receiver) = instance.bind_handler_port::<M>();
339
340        let (_signal_port, signal_receiver) = instance.bind_handler_port::<Signal>();
341
342        let actor_id = instance.self_addr().clone();
343
344        Ok(Self {
345            instance,
346            message_receiver,
347            signal_receiver,
348            status: InstanceStatus::Running,
349            actor_id,
350        })
351    }
352
353    /// Send a message to any actor. It is the responsibility of the caller to ensure the right
354    /// payload accepted by the target actor has been serialized and provided to this function.
355    pub fn send(&self, actor_id: &PyActorAddr, message: &PySerialized) -> PyResult<()> {
356        hyperactor::internal_macro_support::tracing::debug!(
357            name = "py_send_message",
358            actor_id =
359                hyperactor::internal_macro_support::tracing::field::display(self.actor_addr()),
360            receiver_actor_id = tracing::field::display(&actor_id.inner),
361            ?message,
362        );
363        actor_id
364            .inner
365            .port_addr(message.port().into())
366            .send(&self.instance, message.inner.clone());
367        Ok(())
368    }
369
370    /// Make sure the actor is running in detached mode and is alive.
371    fn ensure_detached_and_alive(&mut self) -> Result<()> {
372        anyhow::ensure!(
373            self.status == InstanceStatus::Running,
374            "actor is not running"
375        );
376
377        // This is a little weird as we are potentially stopping before responding to messages
378        // but in reality if we receive stop signal and not stop and drain in most cases its
379        // probably ok to stop early.
380        // Also an implicit assumption here is that is the signal is stop and drain we allow things
381        // to continue as there will hopefully not be new messages coming in. But need a proper draining
382        // flow for this.
383        // TODO: T208289078
384        let signals = self.signal_receiver.drain();
385        if signals
386            .into_iter()
387            .any(|sig| matches!(sig, Signal::Stop(_)))
388        {
389            self.status = InstanceStatus::Stopped;
390            anyhow::bail!("actor has been stopped");
391        }
392
393        Ok(())
394    }
395
396    /// Get the next message from the queue. It will wait until a message is received
397    /// or the timeout is reached in which case it will return None.
398    #[hyperactor::instrument(level = "trace", fields(actor_id = hyperactor::internal_macro_support::tracing::field::display(self.actor_addr())))]
399    pub async fn next_message(&mut self, timeout_msec: Option<u64>) -> Result<Option<M>> {
400        hyperactor::declare_static_timer!(
401            PY_NEXT_MESSAGE_TIMER,
402            "py_next_message",
403            hyperactor_telemetry::TimeUnit::Nanos
404        );
405        let _ = PY_NEXT_MESSAGE_TIMER
406            .start(hyperactor::kv_pairs!("actor_id" => self.actor_addr().to_string(), "mode" => match timeout_msec{
407                None => "blocking",
408                Some(0) => "polling",
409                Some(_) => "blocking_with_timeout",
410            }));
411        self.ensure_detached_and_alive()?;
412        match timeout_msec {
413            // Blocking wait for next message.
414            None => {
415                self.message_receiver.recv().await.map(Some)},
416            Some(0) => {
417                // Non-blocking.
418                // Try to get next message without waiting.
419                self.message_receiver.try_recv()
420            }
421            Some(timeout_msec) => {
422                // Blocking wait with a timeout.
423                match tokio::time::timeout(
424                    Duration::from_millis(timeout_msec),
425                    self.message_receiver.recv(),
426                )
427                .await
428                {
429                    Ok(output) => output.map(Some),
430                    Err(_) => Ok(None), // Timeout reached
431                }
432            }
433        }
434        .map_err(|err| err.into())
435        .inspect_err(|err| {
436            hyperactor::metrics::ACTOR_MESSAGE_RECEIVE_ERRORS.add(1, hyperactor::kv_pairs!("actor_id" => self.actor_addr().to_string()));
437            tracing::error!(err=?err, actor_id=%self.actor_addr(), "unable to receive next py message");
438        })
439        .inspect(|_|{
440            hyperactor::metrics::ACTOR_MESSAGES_RECEIVED.add(1, hyperactor::kv_pairs!("actor_id" => self.actor_addr().to_string()));
441        })
442    }
443
444    /// Put the actor in stopped mode and return any messages that were received.
445    #[hyperactor::instrument(fields(actor_id=hyperactor::internal_macro_support::tracing::field::display(self.actor_addr())))]
446    pub fn drain_and_stop(&mut self) -> Result<Vec<M>> {
447        self.ensure_detached_and_alive()?;
448        let messages: Vec<M> = self.message_receiver.drain().into_iter().collect();
449        tracing::info!("stopping the client actor in Python client");
450        self.status = InstanceStatus::Stopped;
451        Ok(messages)
452    }
453
454    pub fn instance(&self) -> &Instance<()> {
455        &self.instance
456    }
457
458    pub fn actor_addr(&self) -> &hyperactor::ActorAddr {
459        &self.actor_id
460    }
461}
462
463pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
464    hyperactor_mod.add_class::<PyProc>()?;
465    hyperactor_mod.add_class::<PyActorAddr>()?;
466    hyperactor_mod.add_class::<PySerialized>()?;
467    Ok(())
468}