monarch_hyperactor/proc_launcher_probe.rs
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Probe module for validating the explicit response port contract.
10//!
11//! This module exposes a Rust function callable from Python that
12//! inspects what Rust receives when a Python actor sends on a port
13//! created via `explicit_response_port=True`.
14//!
15//! In particular, it answers the question: when Python calls
16//! `Port.send(value)` or `Port.exception(error)`, does Rust receive a
17//! `PythonMessage` envelope with `kind = Result` or `kind =
18//! Exception`?
19
20use pyo3::prelude::*;
21use pyo3::types::PyModule;
22use pyo3::types::PyModuleMethods;
23
24use crate::actor::MethodSpecifier;
25use crate::actor::PythonMessage;
26use crate::actor::PythonMessageKind;
27use crate::actor_mesh::PythonActorMesh;
28use crate::context::PyInstance;
29use crate::mailbox::EitherPortRef;
30use crate::mailbox::PythonOncePortRef;
31use crate::pickle::PendingMessage;
32use crate::pickle::PicklingState;
33use crate::pytokio::PyPythonTask;
34
35/// Report describing what Rust received on the port.
36///
37/// This is returned to Python so tests can assert on the wire-level
38/// message shape, without decoding or interpreting the payload.
39#[pyclass(
40 frozen,
41 module = "monarch._rust_bindings.monarch_hyperactor.proc_launcher_probe"
42)]
43#[derive(Debug, Clone)]
44pub struct ProbeReport {
45 /// High-level classification of what was received: e.g.
46 /// "PythonMessage" or "Error".
47 #[pyo3(get)]
48 pub received_type: String,
49
50 /// If a PythonMessage was received, the message kind ("Result",
51 /// "Exception", etc).
52 #[pyo3(get)]
53 pub kind: Option<String>,
54
55 /// If a PythonMessage was received, the `rank` field carried by
56 /// the message kind (if any).
57 #[pyo3(get)]
58 pub rank: Option<usize>,
59
60 /// Length in bytes of the raw message payload.
61 #[pyo3(get)]
62 pub payload_len: usize,
63
64 /// Raw payload bytes as received by Rust.
65 ///
66 /// Exposed so Python can decode the payload (e.g. via
67 /// cloudpickle) and verify its contents.
68 #[pyo3(get)]
69 pub payload_bytes: Vec<u8>,
70
71 /// Error message if the probe failed before receiving a message.
72 #[pyo3(get)]
73 pub error: Option<String>,
74}
75
76/// Register the probe bindings in the Python extension module.
77pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> {
78 module.add_class::<ProbeReport>()?;
79 module.add_function(wrap_pyfunction!(probe_exit_port_via_mesh, module)?)?;
80 Ok(())
81}
82
83/// Probe the explicit response port via the actor mesh.
84///
85/// This function:
86/// 1. Opens a `OncePort<PythonMessage>` from the given mailbox.
87/// 2. Sends a `CallMethod(ExplicitPort)` message to `method_name` via
88/// `actor_mesh_inner.cast_unresolved(...)`.
89/// 3. Awaits the first message received on the port.
90/// 4. Returns a `ProbeReport` describing what Rust observed.
91///
92/// The purpose is not to test endpoint semantics, but to validate the
93/// *wire envelope* delivered to Rust for explicit response ports.
94///
95/// Arguments:
96/// - `actor_mesh_inner`: The internal actor mesh used to dispatch the
97/// call.
98/// - `instance`: The calling context's Rust instance handle.
99/// - `mailbox`: The mailbox used to allocate the response port.
100/// - `method_name`: Name of the Python endpoint to invoke.
101/// - `pickling_state`: The pickled arguments wrapped in a PicklingState.
102///
103/// Returns:
104/// An awaitable task yielding a `ProbeReport`.
105#[pyfunction]
106#[pyo3(signature = (actor_mesh_inner, instance, method_name, pickling_state))]
107pub(crate) fn probe_exit_port_via_mesh(
108 actor_mesh_inner: &PythonActorMesh,
109 instance: &PyInstance,
110 method_name: String,
111 pickling_state: PyRefMut<'_, PicklingState>,
112) -> PyResult<PyPythonTask> {
113 // Open a OncePort<PythonMessage> - this is what ActorProcLauncher
114 // does
115 let (exit_port, exit_port_rx) = instance
116 ._mailbox()
117 .get_inner()
118 .open_once_port::<PythonMessage>();
119
120 // Build the PythonMessageKind with ExplicitPort
121 let bound_port = exit_port.bind();
122 let kind = PythonMessageKind::CallMethod {
123 name: MethodSpecifier::ExplicitPort {
124 name: method_name.clone(),
125 },
126 response_port: Some(EitherPortRef::Once(PythonOncePortRef::from(bound_port))),
127 };
128
129 // Create a PendingMessage using py_new which takes PyRefMut
130 let mut pending_message = PendingMessage::py_new(kind, pickling_state)?;
131
132 // Cast to all actors in the mesh using cast_unresolved
133 actor_mesh_inner.cast_unresolved(&mut pending_message, "all", instance)?;
134
135 // Return an awaitable task that receives the result
136 PyPythonTask::new(async move {
137 let msg = exit_port_rx.recv().await.map_err(|e| {
138 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("recv failed: {}", e))
139 })?;
140
141 let (kind, rank) = match &msg.kind {
142 PythonMessageKind::Result { rank } => ("Result".to_string(), *rank),
143 PythonMessageKind::Exception { rank } => ("Exception".to_string(), *rank),
144 PythonMessageKind::CallMethod { .. } => ("CallMethod".to_string(), None),
145 PythonMessageKind::CallMethodIndirect { .. } => {
146 ("CallMethodIndirect".to_string(), None)
147 }
148 PythonMessageKind::Uninit {} => ("Uninit".to_string(), None),
149 };
150
151 let payload = msg.message.to_bytes().to_vec();
152 Ok(ProbeReport {
153 received_type: "PythonMessage".to_string(),
154 kind: Some(kind),
155 rank,
156 payload_len: payload.len(),
157 payload_bytes: payload,
158 error: None,
159 })
160 })
161}