monarch_hyperactor/
proc_launcher_probe.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Probe module for validating the explicit response port contract.
10//!
11//! This module exposes a Rust function callable from Python that
12//! inspects what Rust receives when a Python actor sends on a port
13//! created via `explicit_response_port=True`.
14//!
15//! In particular, it answers the question: when Python calls
16//! `Port.send(value)` or `Port.exception(error)`, does Rust receive a
17//! `PythonMessage` envelope with `kind = Result` or `kind =
18//! Exception`?
19
20use pyo3::prelude::*;
21use pyo3::types::PyModule;
22use pyo3::types::PyModuleMethods;
23
24use crate::actor::MethodSpecifier;
25use crate::actor::PythonMessage;
26use crate::actor::PythonMessageKind;
27use crate::actor_mesh::PythonActorMesh;
28use crate::context::PyInstance;
29use crate::mailbox::EitherPortRef;
30use crate::mailbox::PythonOncePortRef;
31use crate::pickle::PendingMessage;
32use crate::pickle::PicklingState;
33use crate::pytokio::PyPythonTask;
34
35/// Report describing what Rust received on the port.
36///
37/// This is returned to Python so tests can assert on the wire-level
38/// message shape, without decoding or interpreting the payload.
39#[pyclass(
40    frozen,
41    module = "monarch._rust_bindings.monarch_hyperactor.proc_launcher_probe"
42)]
43#[derive(Debug, Clone)]
44pub struct ProbeReport {
45    /// High-level classification of what was received: e.g.
46    /// "PythonMessage" or "Error".
47    #[pyo3(get)]
48    pub received_type: String,
49
50    /// If a PythonMessage was received, the message kind ("Result",
51    /// "Exception", etc).
52    #[pyo3(get)]
53    pub kind: Option<String>,
54
55    /// If a PythonMessage was received, the `rank` field carried by
56    /// the message kind (if any).
57    #[pyo3(get)]
58    pub rank: Option<usize>,
59
60    /// Length in bytes of the raw message payload.
61    #[pyo3(get)]
62    pub payload_len: usize,
63
64    /// Raw payload bytes as received by Rust.
65    ///
66    /// Exposed so Python can decode the payload (e.g. via
67    /// cloudpickle) and verify its contents.
68    #[pyo3(get)]
69    pub payload_bytes: Vec<u8>,
70
71    /// Error message if the probe failed before receiving a message.
72    #[pyo3(get)]
73    pub error: Option<String>,
74}
75
76/// Register the probe bindings in the Python extension module.
77pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> {
78    module.add_class::<ProbeReport>()?;
79    module.add_function(wrap_pyfunction!(probe_exit_port_via_mesh, module)?)?;
80    Ok(())
81}
82
83/// Probe the explicit response port via the actor mesh.
84///
85/// This function:
86/// 1. Opens a `OncePort<PythonMessage>` from the given mailbox.
87/// 2. Sends a `CallMethod(ExplicitPort)` message to `method_name` via
88///    `actor_mesh_inner.cast_unresolved(...)`.
89/// 3. Awaits the first message received on the port.
90/// 4. Returns a `ProbeReport` describing what Rust observed.
91///
92/// The purpose is not to test endpoint semantics, but to validate the
93/// *wire envelope* delivered to Rust for explicit response ports.
94///
95/// Arguments:
96/// - `actor_mesh_inner`: The internal actor mesh used to dispatch the
97///   call.
98/// - `instance`: The calling context's Rust instance handle.
99/// - `mailbox`: The mailbox used to allocate the response port.
100/// - `method_name`: Name of the Python endpoint to invoke.
101/// - `pickling_state`: The pickled arguments wrapped in a PicklingState.
102///
103/// Returns:
104/// An awaitable task yielding a `ProbeReport`.
105#[pyfunction]
106#[pyo3(signature = (actor_mesh_inner, instance, method_name, pickling_state))]
107pub(crate) fn probe_exit_port_via_mesh(
108    actor_mesh_inner: &PythonActorMesh,
109    instance: &PyInstance,
110    method_name: String,
111    pickling_state: PyRefMut<'_, PicklingState>,
112) -> PyResult<PyPythonTask> {
113    // Open a OncePort<PythonMessage> - this is what ActorProcLauncher
114    // does
115    let (exit_port, exit_port_rx) = instance
116        ._mailbox()
117        .get_inner()
118        .open_once_port::<PythonMessage>();
119
120    // Build the PythonMessageKind with ExplicitPort
121    let bound_port = exit_port.bind();
122    let kind = PythonMessageKind::CallMethod {
123        name: MethodSpecifier::ExplicitPort {
124            name: method_name.clone(),
125        },
126        response_port: Some(EitherPortRef::Once(PythonOncePortRef::from(bound_port))),
127    };
128
129    // Create a PendingMessage using py_new which takes PyRefMut
130    let mut pending_message = PendingMessage::py_new(kind, pickling_state)?;
131
132    // Cast to all actors in the mesh using cast_unresolved
133    actor_mesh_inner.cast_unresolved(&mut pending_message, "all", instance)?;
134
135    // Return an awaitable task that receives the result
136    PyPythonTask::new(async move {
137        let msg = exit_port_rx.recv().await.map_err(|e| {
138            PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("recv failed: {}", e))
139        })?;
140
141        let (kind, rank) = match &msg.kind {
142            PythonMessageKind::Result { rank } => ("Result".to_string(), *rank),
143            PythonMessageKind::Exception { rank } => ("Exception".to_string(), *rank),
144            PythonMessageKind::CallMethod { .. } => ("CallMethod".to_string(), None),
145            PythonMessageKind::CallMethodIndirect { .. } => {
146                ("CallMethodIndirect".to_string(), None)
147            }
148            PythonMessageKind::Uninit {} => ("Uninit".to_string(), None),
149        };
150
151        let payload = msg.message.to_bytes().to_vec();
152        Ok(ProbeReport {
153            received_type: "PythonMessage".to_string(),
154            kind: Some(kind),
155            rank,
156            payload_len: payload.len(),
157            payload_bytes: payload,
158            error: None,
159        })
160    })
161}