1use std::hash::DefaultHasher;
10use std::hash::Hash;
11use std::hash::Hasher;
12use std::time::Duration;
13
14use anyhow::Result;
15use hyperactor::RemoteMessage;
16use hyperactor::actor::Signal;
17use hyperactor::channel::ChannelAddr;
18use hyperactor::mailbox::PortReceiver;
19use hyperactor::proc::Instance;
20use hyperactor::proc::Proc;
21use hyperactor::reference;
22use monarch_types::PickledPyObject;
23use pyo3::exceptions::PyRuntimeError;
24use pyo3::exceptions::PyValueError;
25use pyo3::prelude::*;
26use pyo3::types::PyList;
27use pyo3::types::PyType;
28
29use crate::actor::PythonActor;
30use crate::actor::PythonActorHandle;
31use crate::runtime::signal_safe_block_on;
32
33#[derive(Clone, Debug)]
35#[pyclass(
36 name = "Proc",
37 module = "monarch._rust_bindings.monarch_hyperactor.proc"
38)]
39pub struct PyProc {
40 pub(super) inner: Proc,
41}
42
43#[pymethods]
44impl PyProc {
45 #[new]
46 #[pyo3(signature = ())]
47 fn new() -> PyResult<Self> {
48 Ok(Self {
49 inner: Proc::local(),
50 })
51 }
52
53 #[getter]
54 fn addr(&self) -> String {
55 self.inner.proc_id().addr().to_string()
56 }
57
58 #[getter]
59 fn name(&self) -> String {
60 self.inner.proc_id().name().to_string()
61 }
62
63 #[getter]
64 fn id(&self) -> String {
65 self.inner.proc_id().to_string()
66 }
67
68 fn destroy<'py>(
69 &mut self,
70 timeout_in_secs: u64,
71 py: Python<'py>,
72 ) -> PyResult<Bound<'py, PyList>> {
73 let mut inner = self.inner.clone();
74 let (_stopped, aborted) = signal_safe_block_on(py, async move {
75 inner
76 .destroy_and_wait::<()>(Duration::from_secs(timeout_in_secs), None, "destroy")
77 .await
78 .map_err(|e| PyRuntimeError::new_err(e.to_string()))
79 })??;
80 let aborted_actors = aborted
81 .into_iter()
82 .map(|actor_id| format!("{}", actor_id))
83 .collect::<Vec<_>>();
84 PyList::new(py, aborted_actors)
87 }
88
89 #[pyo3(signature = (actor, name=None))]
90 fn spawn<'py>(
91 &self,
92 py: Python<'py>,
93 actor: &Bound<'py, PyType>,
94 name: Option<String>,
95 ) -> PyResult<Bound<'py, PyAny>> {
96 let proc = self.inner.clone();
97 let pickled_type = PickledPyObject::pickle(actor.as_any())?;
98 crate::runtime::future_into_py(py, async move {
99 Ok(PythonActorHandle {
100 inner: proc.spawn(
101 name.as_deref().unwrap_or("anon"),
102 PythonActor::new(pickled_type, None, None)?,
103 )?,
104 })
105 })
106 }
107
108 #[pyo3(signature = (actor, name=None))]
109 fn spawn_blocking<'py>(
110 &self,
111 py: Python<'py>,
112 actor: &Bound<'py, PyType>,
113 name: Option<String>,
114 ) -> PyResult<PythonActorHandle> {
115 let proc = self.inner.clone();
116 let pickled_type = PickledPyObject::pickle(actor.as_any())?;
117 Ok(PythonActorHandle {
118 inner: signal_safe_block_on(py, async move {
119 proc.spawn(
120 name.as_deref().unwrap_or("anon"),
121 PythonActor::new(pickled_type, None, None)?,
122 )
123 })
124 .map_err(|e| PyRuntimeError::new_err(e.to_string()))??,
125 })
126 }
127}
128
129impl PyProc {
130 pub fn new_from_proc(proc: Proc) -> Self {
131 Self { inner: proc }
132 }
133}
134
135#[pyclass(
136 frozen,
137 name = "ActorId",
138 module = "monarch._rust_bindings.monarch_hyperactor.proc"
139)]
140#[derive(Clone)]
141pub struct PyActorId {
142 pub(super) inner: reference::ActorId,
143}
144
145impl From<reference::ActorId> for PyActorId {
146 fn from(actor_id: reference::ActorId) -> Self {
147 Self { inner: actor_id }
148 }
149}
150
151impl From<PyActorId> for reference::ActorId {
152 fn from(val: PyActorId) -> Self {
153 val.inner
154 }
155}
156
157#[pymethods]
158impl PyActorId {
159 #[new]
160 #[pyo3(signature = (*, addr, proc_name, actor_name, pid = 0))]
161 fn new(addr: &str, proc_name: &str, actor_name: &str, pid: reference::Index) -> PyResult<Self> {
162 let addr: ChannelAddr = addr.parse().map_err(|e| {
163 PyValueError::new_err(format!("Failed to parse channel address '{}': {}", addr, e))
164 })?;
165 Ok(Self {
166 inner: reference::ActorId::new(
167 reference::ProcId::with_name(addr, proc_name),
168 actor_name,
169 pid,
170 ),
171 })
172 }
173
174 #[staticmethod]
175 fn from_string(actor_id: &str) -> PyResult<Self> {
176 Ok(Self {
177 inner: actor_id.parse().map_err(|e| {
178 PyValueError::new_err(format!(
179 "Failed to extract actor id from {}: {}",
180 actor_id, e
181 ))
182 })?,
183 })
184 }
185
186 #[getter]
187 fn addr(&self) -> String {
188 self.inner.proc_id().addr().to_string()
189 }
190
191 #[getter]
192 fn proc_name(&self) -> String {
193 self.inner.proc_id().name().to_string()
194 }
195
196 #[getter]
197 fn actor_name(&self) -> String {
198 self.inner.name().to_string()
199 }
200
201 #[getter]
202 fn pid(&self) -> reference::Index {
203 self.inner.pid()
204 }
205
206 #[getter]
207 fn proc_id(&self) -> String {
208 self.inner.proc_id().to_string()
209 }
210
211 fn __str__(&self) -> String {
212 self.inner.to_string()
213 }
214
215 fn __hash__(&self) -> u64 {
216 let mut hasher = DefaultHasher::new();
217 self.inner.to_string().hash(&mut hasher);
218 hasher.finish()
219 }
220
221 fn __eq__(&self, other: &Bound<'_, PyAny>) -> PyResult<bool> {
222 if let Ok(other) = other.extract::<PyActorId>() {
223 Ok(self.inner == other.inner)
224 } else {
225 Ok(false)
226 }
227 }
228
229 fn __reduce__<'py>(slf: &Bound<'py, Self>) -> PyResult<(Bound<'py, PyAny>, (String,))> {
230 Ok((slf.getattr("from_string")?, (slf.borrow().__str__(),)))
231 }
232}
233
234impl From<&PyActorId> for reference::ActorId {
235 fn from(actor_id: &PyActorId) -> Self {
236 actor_id.inner.clone()
237 }
238}
239
240impl std::fmt::Debug for PyActorId {
241 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242 self.inner.fmt(f)
243 }
244}
245
246#[derive(Clone, Copy, PartialEq, Eq)]
247enum InstanceStatus {
248 Running,
249 Stopped,
250}
251
252#[pyclass(
255 frozen,
256 name = "Serialized",
257 module = "monarch._rust_bindings.monarch_hyperactor.proc"
258)]
259#[derive(Debug)]
260pub struct PySerialized {
261 inner: wirevalue::Any,
262 port: u64,
264}
265
266impl PySerialized {
267 pub fn new<M: RemoteMessage>(message: &M) -> PyResult<Self> {
268 Ok(Self {
269 inner: wirevalue::Any::serialize(message).map_err(|err| {
270 PyRuntimeError::new_err(format!(
271 "failed to serialize message of type {} to Any: {}",
272 std::any::type_name::<M>(),
273 err
274 ))
275 })?,
276 port: M::port(),
277 })
278 }
279
280 pub fn deserialized<M: RemoteMessage>(&self) -> PyResult<M> {
281 self.inner.deserialized().map_err(|err| {
282 PyRuntimeError::new_err(format!("failed to deserialize message: {}", err))
283 })
284 }
285
286 pub fn port(&self) -> u64 {
288 self.port
289 }
290}
291
292pub struct InstanceWrapper<M: RemoteMessage> {
296 instance: Instance<()>,
297 message_receiver: PortReceiver<M>,
298 signal_receiver: PortReceiver<Signal>,
299 status: InstanceStatus,
300 actor_id: reference::ActorId,
301}
302
303impl<M: RemoteMessage> InstanceWrapper<M> {
304 pub fn new(proc: &PyProc, actor_name: &str) -> Result<Self> {
305 let instance = proc.inner.instance(actor_name)?.0;
306 let (_message_port, message_receiver) = instance.bind_actor_port::<M>();
308
309 let (_signal_port, signal_receiver) = instance.bind_actor_port::<Signal>();
310
311 let actor_id = instance.self_id().clone();
312
313 Ok(Self {
314 instance,
315 message_receiver,
316 signal_receiver,
317 status: InstanceStatus::Running,
318 actor_id,
319 })
320 }
321
322 pub fn send(&self, actor_id: &PyActorId, message: &PySerialized) -> PyResult<()> {
325 hyperactor::internal_macro_support::tracing::debug!(
326 name = "py_send_message",
327 actor_id = hyperactor::internal_macro_support::tracing::field::display(self.actor_id()),
328 receiver_actor_id = tracing::field::display(&actor_id.inner),
329 ?message,
330 );
331 actor_id
332 .inner
333 .port_id(message.port())
334 .send(&self.instance, message.inner.clone());
335 Ok(())
336 }
337
338 fn ensure_detached_and_alive(&mut self) -> Result<()> {
340 anyhow::ensure!(
341 self.status == InstanceStatus::Running,
342 "actor is not running"
343 );
344
345 let signals = self.signal_receiver.drain();
353 if signals
354 .into_iter()
355 .any(|sig| matches!(sig, Signal::Stop(_)))
356 {
357 self.status = InstanceStatus::Stopped;
358 anyhow::bail!("actor has been stopped");
359 }
360
361 Ok(())
362 }
363
364 #[hyperactor::instrument(level = "trace", fields(actor_id = hyperactor::internal_macro_support::tracing::field::display(self.actor_id())))]
367 pub async fn next_message(&mut self, timeout_msec: Option<u64>) -> Result<Option<M>> {
368 hyperactor::declare_static_timer!(
369 PY_NEXT_MESSAGE_TIMER,
370 "py_next_message",
371 hyperactor_telemetry::TimeUnit::Nanos
372 );
373 let _ = PY_NEXT_MESSAGE_TIMER
374 .start(hyperactor::kv_pairs!("actor_id" => self.actor_id().to_string(), "mode" => match timeout_msec{
375 None => "blocking",
376 Some(0) => "polling",
377 Some(_) => "blocking_with_timeout",
378 }));
379 self.ensure_detached_and_alive()?;
380 match timeout_msec {
381 None => {
383 self.message_receiver.recv().await.map(Some)},
384 Some(0) => {
385 self.message_receiver.try_recv()
388 }
389 Some(timeout_msec) => {
390 match tokio::time::timeout(
392 Duration::from_millis(timeout_msec),
393 self.message_receiver.recv(),
394 )
395 .await
396 {
397 Ok(output) => output.map(Some),
398 Err(_) => Ok(None), }
400 }
401 }
402 .map_err(|err| err.into())
403 .inspect_err(|err| {
404 hyperactor::metrics::ACTOR_MESSAGE_RECEIVE_ERRORS.add(1, hyperactor::kv_pairs!("actor_id" => self.actor_id().to_string()));
405 tracing::error!(err=?err, actor_id=%self.actor_id(), "unable to receive next py message");
406 })
407 .inspect(|_|{
408 hyperactor::metrics::ACTOR_MESSAGES_RECEIVED.add(1, hyperactor::kv_pairs!("actor_id" => self.actor_id().to_string()));
409 })
410 }
411
412 #[hyperactor::instrument(fields(actor_id=hyperactor::internal_macro_support::tracing::field::display(self.actor_id())))]
414 pub fn drain_and_stop(&mut self) -> Result<Vec<M>> {
415 self.ensure_detached_and_alive()?;
416 let messages: Vec<M> = self.message_receiver.drain().into_iter().collect();
417 tracing::info!("stopping the client actor in Python client");
418 self.status = InstanceStatus::Stopped;
419 Ok(messages)
420 }
421
422 pub fn instance(&self) -> &Instance<()> {
423 &self.instance
424 }
425
426 pub fn actor_id(&self) -> &reference::ActorId {
427 &self.actor_id
428 }
429}
430
431pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
432 hyperactor_mod.add_class::<PyProc>()?;
433 hyperactor_mod.add_class::<PyActorId>()?;
434 hyperactor_mod.add_class::<PySerialized>()?;
435 Ok(())
436}