1use std::hash::DefaultHasher;
10use std::hash::Hash;
11use std::hash::Hasher;
12use std::time::Duration;
13
14use anyhow::Result;
15use hyperactor::RemoteMessage;
16use hyperactor::actor::Signal;
17use hyperactor::channel::ChannelAddr;
18use hyperactor::mailbox::PortReceiver;
19use hyperactor::proc::Instance;
20use hyperactor::proc::Proc;
21use monarch_types::PickledPyObject;
22use pyo3::exceptions::PyRuntimeError;
23use pyo3::exceptions::PyValueError;
24use pyo3::prelude::*;
25use pyo3::types::PyList;
26use pyo3::types::PyType;
27
28use crate::actor::PythonActor;
29use crate::actor::PythonActorHandle;
30use crate::runtime::signal_safe_block_on;
31
32#[derive(Clone, Debug)]
34#[pyclass(
35 name = "Proc",
36 module = "monarch._rust_bindings.monarch_hyperactor.proc"
37)]
38pub struct PyProc {
39 pub(super) inner: Proc,
40}
41
42#[pymethods]
43impl PyProc {
44 #[new]
45 #[pyo3(signature = ())]
46 fn new() -> PyResult<Self> {
47 Ok(Self {
48 inner: Proc::isolated(),
49 })
50 }
51
52 #[getter]
53 fn addr(&self) -> String {
54 self.inner.proc_addr().addr().to_string()
55 }
56
57 #[getter]
58 fn name(&self) -> String {
59 self.inner
60 .proc_addr()
61 .label()
62 .map(|l: &hyperactor::id::Label| l.as_str().to_string())
63 .unwrap_or_else(|| self.inner.proc_addr().id().to_string())
64 }
65
66 #[getter]
67 fn id(&self) -> String {
68 self.inner.proc_addr().to_string()
69 }
70
71 fn destroy<'py>(
72 &mut self,
73 timeout_in_secs: u64,
74 py: Python<'py>,
75 ) -> PyResult<Bound<'py, PyList>> {
76 let mut inner = self.inner.clone();
77 let (_stopped, aborted) = signal_safe_block_on(py, async move {
78 inner
79 .destroy_and_wait(Duration::from_secs(timeout_in_secs), "destroy")
80 .await
81 .map_err(|e| PyRuntimeError::new_err(e.to_string()))
82 })??;
83 let aborted_actors = aborted
84 .into_iter()
85 .map(|actor_id| format!("{}", actor_id))
86 .collect::<Vec<_>>();
87 PyList::new(py, aborted_actors)
90 }
91
92 #[pyo3(signature = (actor, name=None))]
93 fn spawn<'py>(
94 &self,
95 py: Python<'py>,
96 actor: &Bound<'py, PyType>,
97 name: Option<String>,
98 ) -> PyResult<Bound<'py, PyAny>> {
99 let proc = self.inner.clone();
100 let pickled_type = PickledPyObject::pickle(actor.as_any())?;
101 crate::runtime::future_into_py(py, async move {
102 Ok(PythonActorHandle {
103 inner: proc.spawn(
104 name.as_deref().unwrap_or("anon"),
105 PythonActor::new(pickled_type, None, None, None)?,
106 )?,
107 })
108 })
109 }
110
111 #[pyo3(signature = (actor, name=None))]
112 fn spawn_blocking<'py>(
113 &self,
114 py: Python<'py>,
115 actor: &Bound<'py, PyType>,
116 name: Option<String>,
117 ) -> PyResult<PythonActorHandle> {
118 let proc = self.inner.clone();
119 let pickled_type = PickledPyObject::pickle(actor.as_any())?;
120 Ok(PythonActorHandle {
121 inner: signal_safe_block_on(py, async move {
122 proc.spawn(
123 name.as_deref().unwrap_or("anon"),
124 PythonActor::new(pickled_type, None, None, None)?,
125 )
126 })
127 .map_err(|e| PyRuntimeError::new_err(e.to_string()))??,
128 })
129 }
130}
131
132impl PyProc {
133 pub fn new_from_proc(proc: Proc) -> Self {
134 Self { inner: proc }
135 }
136}
137
138#[pyclass(
139 frozen,
140 name = "ActorAddr",
141 module = "monarch._rust_bindings.monarch_hyperactor.proc"
142)]
143#[derive(Clone)]
144pub struct PyActorAddr {
145 pub(super) inner: hyperactor::ActorAddr,
146}
147
148impl From<hyperactor::ActorAddr> for PyActorAddr {
149 fn from(actor_id: hyperactor::ActorAddr) -> Self {
150 Self { inner: actor_id }
151 }
152}
153
154impl From<PyActorAddr> for hyperactor::ActorAddr {
155 fn from(val: PyActorAddr) -> Self {
156 val.inner
157 }
158}
159
160#[pymethods]
161impl PyActorAddr {
162 #[new]
163 #[pyo3(signature = (*, addr, proc_name, actor_name))]
164 fn new(addr: &str, proc_name: &str, actor_name: &str) -> PyResult<Self> {
165 let addr: ChannelAddr = addr.parse().map_err(|e| {
166 PyValueError::new_err(format!("Failed to parse channel address '{}': {}", addr, e))
167 })?;
168 Ok(Self {
169 inner: hyperactor::ProcAddr::singleton(addr, proc_name).actor_addr(actor_name),
170 })
171 }
172
173 #[staticmethod]
174 fn from_string(actor_id: &str) -> PyResult<Self> {
175 Ok(Self {
176 inner: actor_id.parse().map_err(|e| {
177 PyValueError::new_err(format!(
178 "Failed to extract actor id from {}: {}",
179 actor_id, e
180 ))
181 })?,
182 })
183 }
184
185 #[getter]
186 fn addr(&self) -> String {
187 self.inner.proc_addr().addr().to_string()
188 }
189
190 #[getter]
191 fn proc_name(&self) -> String {
192 self.inner
193 .proc_addr()
194 .label()
195 .map(|l: &hyperactor::id::Label| l.as_str().to_string())
196 .unwrap_or_else(|| self.inner.proc_addr().id().to_string())
197 }
198
199 #[getter]
200 fn actor_name(&self) -> String {
201 self.inner
202 .label()
203 .map(|l: &hyperactor::id::Label| l.as_str().to_string())
204 .unwrap_or_else(|| self.inner.uid().to_string())
205 }
206
207 #[getter]
208 fn label(&self) -> Option<String> {
209 self.inner
210 .label()
211 .map(|l: &hyperactor::id::Label| l.as_str().to_string())
212 }
213
214 #[getter]
215 fn proc_label(&self) -> Option<String> {
216 self.inner
217 .proc_addr()
218 .label()
219 .map(|l: &hyperactor::id::Label| l.as_str().to_string())
220 }
221
222 #[getter]
223 fn uid(&self) -> String {
224 self.inner.uid().to_string()
225 }
226
227 #[getter]
228 fn pid(&self) -> String {
229 self.uid()
230 }
231
232 #[getter]
233 fn proc_id(&self) -> String {
234 self.inner.proc_addr().to_string()
235 }
236
237 #[getter]
238 fn is_root(&self) -> bool {
239 self.inner.is_root()
240 }
241
242 fn __str__(&self) -> String {
243 self.inner.to_string()
244 }
245
246 fn __hash__(&self) -> u64 {
247 let mut hasher = DefaultHasher::new();
248 self.inner.to_string().hash(&mut hasher);
249 hasher.finish()
250 }
251
252 fn __eq__(&self, other: &Bound<'_, PyAny>) -> PyResult<bool> {
253 if let Ok(other) = other.extract::<PyActorAddr>() {
254 Ok(self.inner == other.inner)
255 } else {
256 Ok(false)
257 }
258 }
259
260 fn __reduce__<'py>(slf: &Bound<'py, Self>) -> PyResult<(Bound<'py, PyAny>, (String,))> {
261 Ok((slf.getattr("from_string")?, (slf.borrow().__str__(),)))
262 }
263}
264
265impl From<&PyActorAddr> for hyperactor::ActorAddr {
266 fn from(actor_id: &PyActorAddr) -> Self {
267 actor_id.inner.clone()
268 }
269}
270
271impl std::fmt::Debug for PyActorAddr {
272 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
273 self.inner.fmt(f)
274 }
275}
276
277#[derive(Clone, Copy, PartialEq, Eq)]
278enum InstanceStatus {
279 Running,
280 Stopped,
281}
282
283#[pyclass(
286 frozen,
287 name = "Serialized",
288 module = "monarch._rust_bindings.monarch_hyperactor.proc"
289)]
290#[derive(Debug)]
291pub struct PySerialized {
292 inner: wirevalue::Any,
293 port: u64,
295}
296
297impl PySerialized {
298 pub fn new<M: RemoteMessage>(message: &M) -> PyResult<Self> {
299 Ok(Self {
300 inner: wirevalue::Any::serialize(message).map_err(|err| {
301 PyRuntimeError::new_err(format!(
302 "failed to serialize message of type {} to Any: {}",
303 std::any::type_name::<M>(),
304 err
305 ))
306 })?,
307 port: M::port(),
308 })
309 }
310
311 pub fn deserialized<M: RemoteMessage>(&self) -> PyResult<M> {
312 self.inner.deserialized().map_err(|err| {
313 PyRuntimeError::new_err(format!("failed to deserialize message: {}", err))
314 })
315 }
316
317 pub fn port(&self) -> u64 {
319 self.port
320 }
321}
322
323pub struct InstanceWrapper<M: RemoteMessage> {
327 instance: Instance<()>,
328 message_receiver: PortReceiver<M>,
329 signal_receiver: PortReceiver<Signal>,
330 status: InstanceStatus,
331 actor_id: hyperactor::ActorAddr,
332}
333
334impl<M: RemoteMessage> InstanceWrapper<M> {
335 pub fn new(proc: &PyProc, actor_name: &str) -> Result<Self> {
336 let instance = proc.inner.client(actor_name)?.0;
337 let (_handler_port, message_receiver) = instance.bind_handler_port::<M>();
339
340 let (_signal_port, signal_receiver) = instance.bind_handler_port::<Signal>();
341
342 let actor_id = instance.self_addr().clone();
343
344 Ok(Self {
345 instance,
346 message_receiver,
347 signal_receiver,
348 status: InstanceStatus::Running,
349 actor_id,
350 })
351 }
352
353 pub fn send(&self, actor_id: &PyActorAddr, message: &PySerialized) -> PyResult<()> {
356 hyperactor::internal_macro_support::tracing::debug!(
357 name = "py_send_message",
358 actor_id =
359 hyperactor::internal_macro_support::tracing::field::display(self.actor_addr()),
360 receiver_actor_id = tracing::field::display(&actor_id.inner),
361 ?message,
362 );
363 actor_id
364 .inner
365 .port_addr(message.port().into())
366 .send(&self.instance, message.inner.clone());
367 Ok(())
368 }
369
370 fn ensure_detached_and_alive(&mut self) -> Result<()> {
372 anyhow::ensure!(
373 self.status == InstanceStatus::Running,
374 "actor is not running"
375 );
376
377 let signals = self.signal_receiver.drain();
385 if signals
386 .into_iter()
387 .any(|sig| matches!(sig, Signal::Stop(_)))
388 {
389 self.status = InstanceStatus::Stopped;
390 anyhow::bail!("actor has been stopped");
391 }
392
393 Ok(())
394 }
395
396 #[hyperactor::instrument(level = "trace", fields(actor_id = hyperactor::internal_macro_support::tracing::field::display(self.actor_addr())))]
399 pub async fn next_message(&mut self, timeout_msec: Option<u64>) -> Result<Option<M>> {
400 hyperactor::declare_static_timer!(
401 PY_NEXT_MESSAGE_TIMER,
402 "py_next_message",
403 hyperactor_telemetry::TimeUnit::Nanos
404 );
405 let _ = PY_NEXT_MESSAGE_TIMER
406 .start(hyperactor::kv_pairs!("actor_id" => self.actor_addr().to_string(), "mode" => match timeout_msec{
407 None => "blocking",
408 Some(0) => "polling",
409 Some(_) => "blocking_with_timeout",
410 }));
411 self.ensure_detached_and_alive()?;
412 match timeout_msec {
413 None => {
415 self.message_receiver.recv().await.map(Some)},
416 Some(0) => {
417 self.message_receiver.try_recv()
420 }
421 Some(timeout_msec) => {
422 match tokio::time::timeout(
424 Duration::from_millis(timeout_msec),
425 self.message_receiver.recv(),
426 )
427 .await
428 {
429 Ok(output) => output.map(Some),
430 Err(_) => Ok(None), }
432 }
433 }
434 .map_err(|err| err.into())
435 .inspect_err(|err| {
436 hyperactor::metrics::ACTOR_MESSAGE_RECEIVE_ERRORS.add(1, hyperactor::kv_pairs!("actor_id" => self.actor_addr().to_string()));
437 tracing::error!(err=?err, actor_id=%self.actor_addr(), "unable to receive next py message");
438 })
439 .inspect(|_|{
440 hyperactor::metrics::ACTOR_MESSAGES_RECEIVED.add(1, hyperactor::kv_pairs!("actor_id" => self.actor_addr().to_string()));
441 })
442 }
443
444 #[hyperactor::instrument(fields(actor_id=hyperactor::internal_macro_support::tracing::field::display(self.actor_addr())))]
446 pub fn drain_and_stop(&mut self) -> Result<Vec<M>> {
447 self.ensure_detached_and_alive()?;
448 let messages: Vec<M> = self.message_receiver.drain().into_iter().collect();
449 tracing::info!("stopping the client actor in Python client");
450 self.status = InstanceStatus::Stopped;
451 Ok(messages)
452 }
453
454 pub fn instance(&self) -> &Instance<()> {
455 &self.instance
456 }
457
458 pub fn actor_addr(&self) -> &hyperactor::ActorAddr {
459 &self.actor_id
460 }
461}
462
463pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
464 hyperactor_mod.add_class::<PyProc>()?;
465 hyperactor_mod.add_class::<PyActorAddr>()?;
466 hyperactor_mod.add_class::<PySerialized>()?;
467 Ok(())
468}