monarch_hyperactor/
proc.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::hash::DefaultHasher;
10use std::hash::Hash;
11use std::hash::Hasher;
12use std::time::Duration;
13
14use anyhow::Result;
15use hyperactor::RemoteMessage;
16use hyperactor::actor::Signal;
17use hyperactor::channel::ChannelAddr;
18use hyperactor::mailbox::PortReceiver;
19use hyperactor::proc::Instance;
20use hyperactor::proc::Proc;
21use hyperactor::reference;
22use monarch_types::PickledPyObject;
23use pyo3::exceptions::PyRuntimeError;
24use pyo3::exceptions::PyValueError;
25use pyo3::prelude::*;
26use pyo3::types::PyList;
27use pyo3::types::PyType;
28
29use crate::actor::PythonActor;
30use crate::actor::PythonActorHandle;
31use crate::runtime::signal_safe_block_on;
32
33/// Wrapper around a proc that provides utilities to implement a python actor.
34#[derive(Clone, Debug)]
35#[pyclass(
36    name = "Proc",
37    module = "monarch._rust_bindings.monarch_hyperactor.proc"
38)]
39pub struct PyProc {
40    pub(super) inner: Proc,
41}
42
43#[pymethods]
44impl PyProc {
45    #[new]
46    #[pyo3(signature = ())]
47    fn new() -> PyResult<Self> {
48        Ok(Self {
49            inner: Proc::local(),
50        })
51    }
52
53    #[getter]
54    fn addr(&self) -> String {
55        self.inner.proc_id().addr().to_string()
56    }
57
58    #[getter]
59    fn name(&self) -> String {
60        self.inner.proc_id().name().to_string()
61    }
62
63    #[getter]
64    fn id(&self) -> String {
65        self.inner.proc_id().to_string()
66    }
67
68    fn destroy<'py>(
69        &mut self,
70        timeout_in_secs: u64,
71        py: Python<'py>,
72    ) -> PyResult<Bound<'py, PyList>> {
73        let mut inner = self.inner.clone();
74        let (_stopped, aborted) = signal_safe_block_on(py, async move {
75            inner
76                .destroy_and_wait::<()>(Duration::from_secs(timeout_in_secs), None, "destroy")
77                .await
78                .map_err(|e| PyRuntimeError::new_err(e.to_string()))
79        })??;
80        let aborted_actors = aborted
81            .into_iter()
82            .map(|actor_id| format!("{}", actor_id))
83            .collect::<Vec<_>>();
84        // TODO: i don't think returning this list is of much use for
85        // anything?
86        PyList::new(py, aborted_actors)
87    }
88
89    #[pyo3(signature = (actor, name=None))]
90    fn spawn<'py>(
91        &self,
92        py: Python<'py>,
93        actor: &Bound<'py, PyType>,
94        name: Option<String>,
95    ) -> PyResult<Bound<'py, PyAny>> {
96        let proc = self.inner.clone();
97        let pickled_type = PickledPyObject::pickle(actor.as_any())?;
98        crate::runtime::future_into_py(py, async move {
99            Ok(PythonActorHandle {
100                inner: proc.spawn(
101                    name.as_deref().unwrap_or("anon"),
102                    PythonActor::new(pickled_type, None, None)?,
103                )?,
104            })
105        })
106    }
107
108    #[pyo3(signature = (actor, name=None))]
109    fn spawn_blocking<'py>(
110        &self,
111        py: Python<'py>,
112        actor: &Bound<'py, PyType>,
113        name: Option<String>,
114    ) -> PyResult<PythonActorHandle> {
115        let proc = self.inner.clone();
116        let pickled_type = PickledPyObject::pickle(actor.as_any())?;
117        Ok(PythonActorHandle {
118            inner: signal_safe_block_on(py, async move {
119                proc.spawn(
120                    name.as_deref().unwrap_or("anon"),
121                    PythonActor::new(pickled_type, None, None)?,
122                )
123            })
124            .map_err(|e| PyRuntimeError::new_err(e.to_string()))??,
125        })
126    }
127}
128
129impl PyProc {
130    pub fn new_from_proc(proc: Proc) -> Self {
131        Self { inner: proc }
132    }
133}
134
135#[pyclass(
136    frozen,
137    name = "ActorId",
138    module = "monarch._rust_bindings.monarch_hyperactor.proc"
139)]
140#[derive(Clone)]
141pub struct PyActorId {
142    pub(super) inner: reference::ActorId,
143}
144
145impl From<reference::ActorId> for PyActorId {
146    fn from(actor_id: reference::ActorId) -> Self {
147        Self { inner: actor_id }
148    }
149}
150
151impl From<PyActorId> for reference::ActorId {
152    fn from(val: PyActorId) -> Self {
153        val.inner
154    }
155}
156
157#[pymethods]
158impl PyActorId {
159    #[new]
160    #[pyo3(signature = (*, addr, proc_name, actor_name, pid = 0))]
161    fn new(addr: &str, proc_name: &str, actor_name: &str, pid: reference::Index) -> PyResult<Self> {
162        let addr: ChannelAddr = addr.parse().map_err(|e| {
163            PyValueError::new_err(format!("Failed to parse channel address '{}': {}", addr, e))
164        })?;
165        Ok(Self {
166            inner: reference::ActorId::new(
167                reference::ProcId::with_name(addr, proc_name),
168                actor_name,
169                pid,
170            ),
171        })
172    }
173
174    #[staticmethod]
175    fn from_string(actor_id: &str) -> PyResult<Self> {
176        Ok(Self {
177            inner: actor_id.parse().map_err(|e| {
178                PyValueError::new_err(format!(
179                    "Failed to extract actor id from {}: {}",
180                    actor_id, e
181                ))
182            })?,
183        })
184    }
185
186    #[getter]
187    fn addr(&self) -> String {
188        self.inner.proc_id().addr().to_string()
189    }
190
191    #[getter]
192    fn proc_name(&self) -> String {
193        self.inner.proc_id().name().to_string()
194    }
195
196    #[getter]
197    fn actor_name(&self) -> String {
198        self.inner.name().to_string()
199    }
200
201    #[getter]
202    fn pid(&self) -> reference::Index {
203        self.inner.pid()
204    }
205
206    #[getter]
207    fn proc_id(&self) -> String {
208        self.inner.proc_id().to_string()
209    }
210
211    fn __str__(&self) -> String {
212        self.inner.to_string()
213    }
214
215    fn __hash__(&self) -> u64 {
216        let mut hasher = DefaultHasher::new();
217        self.inner.to_string().hash(&mut hasher);
218        hasher.finish()
219    }
220
221    fn __eq__(&self, other: &Bound<'_, PyAny>) -> PyResult<bool> {
222        if let Ok(other) = other.extract::<PyActorId>() {
223            Ok(self.inner == other.inner)
224        } else {
225            Ok(false)
226        }
227    }
228
229    fn __reduce__<'py>(slf: &Bound<'py, Self>) -> PyResult<(Bound<'py, PyAny>, (String,))> {
230        Ok((slf.getattr("from_string")?, (slf.borrow().__str__(),)))
231    }
232}
233
234impl From<&PyActorId> for reference::ActorId {
235    fn from(actor_id: &PyActorId) -> Self {
236        actor_id.inner.clone()
237    }
238}
239
240impl std::fmt::Debug for PyActorId {
241    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242        self.inner.fmt(f)
243    }
244}
245
246#[derive(Clone, Copy, PartialEq, Eq)]
247enum InstanceStatus {
248    Running,
249    Stopped,
250}
251
252/// Wrapper around a [`Any`] that allows returning it to python and
253/// passed to python based detached actors to send to other actors.
254#[pyclass(
255    frozen,
256    name = "Serialized",
257    module = "monarch._rust_bindings.monarch_hyperactor.proc"
258)]
259#[derive(Debug)]
260pub struct PySerialized {
261    inner: wirevalue::Any,
262    /// The message port (type) of the message.
263    port: u64,
264}
265
266impl PySerialized {
267    pub fn new<M: RemoteMessage>(message: &M) -> PyResult<Self> {
268        Ok(Self {
269            inner: wirevalue::Any::serialize(message).map_err(|err| {
270                PyRuntimeError::new_err(format!(
271                    "failed to serialize message of type {} to Any: {}",
272                    std::any::type_name::<M>(),
273                    err
274                ))
275            })?,
276            port: M::port(),
277        })
278    }
279
280    pub fn deserialized<M: RemoteMessage>(&self) -> PyResult<M> {
281        self.inner.deserialized().map_err(|err| {
282            PyRuntimeError::new_err(format!("failed to deserialize message: {}", err))
283        })
284    }
285
286    /// The message port (type) of the message.
287    pub fn port(&self) -> u64 {
288        self.port
289    }
290}
291
292/// Wrapper around an instance of an actor that provides utilities to implement
293/// a python actor. This helps by allowing users to specialize the actor to the
294/// message type they want to handle.
295pub struct InstanceWrapper<M: RemoteMessage> {
296    instance: Instance<()>,
297    message_receiver: PortReceiver<M>,
298    signal_receiver: PortReceiver<Signal>,
299    status: InstanceStatus,
300    actor_id: reference::ActorId,
301}
302
303impl<M: RemoteMessage> InstanceWrapper<M> {
304    pub fn new(proc: &PyProc, actor_name: &str) -> Result<Self> {
305        let instance = proc.inner.instance(actor_name)?.0;
306        // TEMPORARY: remove after using fixed message ports.
307        let (_message_port, message_receiver) = instance.bind_actor_port::<M>();
308
309        let (_signal_port, signal_receiver) = instance.bind_actor_port::<Signal>();
310
311        let actor_id = instance.self_id().clone();
312
313        Ok(Self {
314            instance,
315            message_receiver,
316            signal_receiver,
317            status: InstanceStatus::Running,
318            actor_id,
319        })
320    }
321
322    /// Send a message to any actor. It is the responsibility of the caller to ensure the right
323    /// payload accepted by the target actor has been serialized and provided to this function.
324    pub fn send(&self, actor_id: &PyActorId, message: &PySerialized) -> PyResult<()> {
325        hyperactor::internal_macro_support::tracing::debug!(
326            name = "py_send_message",
327            actor_id = hyperactor::internal_macro_support::tracing::field::display(self.actor_id()),
328            receiver_actor_id = tracing::field::display(&actor_id.inner),
329            ?message,
330        );
331        actor_id
332            .inner
333            .port_id(message.port())
334            .send(&self.instance, message.inner.clone());
335        Ok(())
336    }
337
338    /// Make sure the actor is running in detached mode and is alive.
339    fn ensure_detached_and_alive(&mut self) -> Result<()> {
340        anyhow::ensure!(
341            self.status == InstanceStatus::Running,
342            "actor is not running"
343        );
344
345        // This is a little weird as we are potentially stopping before responding to messages
346        // but in reality if we receive stop signal and not stop and drain in most cases its
347        // probably ok to stop early.
348        // Also an implicit assumption here is that is the signal is stop and drain we allow things
349        // to continue as there will hopefully not be new messages coming in. But need a proper draining
350        // flow for this.
351        // TODO: T208289078
352        let signals = self.signal_receiver.drain();
353        if signals
354            .into_iter()
355            .any(|sig| matches!(sig, Signal::Stop(_)))
356        {
357            self.status = InstanceStatus::Stopped;
358            anyhow::bail!("actor has been stopped");
359        }
360
361        Ok(())
362    }
363
364    /// Get the next message from the queue. It will wait until a message is received
365    /// or the timeout is reached in which case it will return None.
366    #[hyperactor::instrument(level = "trace", fields(actor_id = hyperactor::internal_macro_support::tracing::field::display(self.actor_id())))]
367    pub async fn next_message(&mut self, timeout_msec: Option<u64>) -> Result<Option<M>> {
368        hyperactor::declare_static_timer!(
369            PY_NEXT_MESSAGE_TIMER,
370            "py_next_message",
371            hyperactor_telemetry::TimeUnit::Nanos
372        );
373        let _ = PY_NEXT_MESSAGE_TIMER
374            .start(hyperactor::kv_pairs!("actor_id" => self.actor_id().to_string(), "mode" => match timeout_msec{
375                None => "blocking",
376                Some(0) => "polling",
377                Some(_) => "blocking_with_timeout",
378            }));
379        self.ensure_detached_and_alive()?;
380        match timeout_msec {
381            // Blocking wait for next message.
382            None => {
383                self.message_receiver.recv().await.map(Some)},
384            Some(0) => {
385                // Non-blocking.
386                // Try to get next message without waiting.
387                self.message_receiver.try_recv()
388            }
389            Some(timeout_msec) => {
390                // Blocking wait with a timeout.
391                match tokio::time::timeout(
392                    Duration::from_millis(timeout_msec),
393                    self.message_receiver.recv(),
394                )
395                .await
396                {
397                    Ok(output) => output.map(Some),
398                    Err(_) => Ok(None), // Timeout reached
399                }
400            }
401        }
402        .map_err(|err| err.into())
403        .inspect_err(|err| {
404            hyperactor::metrics::ACTOR_MESSAGE_RECEIVE_ERRORS.add(1, hyperactor::kv_pairs!("actor_id" => self.actor_id().to_string()));
405            tracing::error!(err=?err, actor_id=%self.actor_id(), "unable to receive next py message");
406        })
407        .inspect(|_|{
408            hyperactor::metrics::ACTOR_MESSAGES_RECEIVED.add(1, hyperactor::kv_pairs!("actor_id" => self.actor_id().to_string()));
409        })
410    }
411
412    /// Put the actor in stopped mode and return any messages that were received.
413    #[hyperactor::instrument(fields(actor_id=hyperactor::internal_macro_support::tracing::field::display(self.actor_id())))]
414    pub fn drain_and_stop(&mut self) -> Result<Vec<M>> {
415        self.ensure_detached_and_alive()?;
416        let messages: Vec<M> = self.message_receiver.drain().into_iter().collect();
417        tracing::info!("stopping the client actor in Python client");
418        self.status = InstanceStatus::Stopped;
419        Ok(messages)
420    }
421
422    pub fn instance(&self) -> &Instance<()> {
423        &self.instance
424    }
425
426    pub fn actor_id(&self) -> &reference::ActorId {
427        &self.actor_id
428    }
429}
430
431pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
432    hyperactor_mod.add_class::<PyProc>()?;
433    hyperactor_mod.add_class::<PyActorId>()?;
434    hyperactor_mod.add_class::<PySerialized>()?;
435    Ok(())
436}