Skip to main content

monarch_hyperactor/
endpoint.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::cell::Cell;
10use std::sync::Arc;
11use std::sync::atomic::AtomicUsize;
12use std::sync::atomic::Ordering;
13
14use hyperactor::ActorAddr;
15use hyperactor::Instance;
16use hyperactor::accum::Accumulator;
17use hyperactor::accum::CommReducer;
18use hyperactor::accum::ReducerFactory;
19use hyperactor::accum::ReducerSpec;
20use hyperactor::mailbox::OncePortReceiver;
21use hyperactor::mailbox::PortReceiver;
22use hyperactor_mesh::sel;
23use hyperactor_mesh::value_mesh::ValueOverlay;
24use hyperactor_mesh::value_mesh::rle;
25use monarch_types::py_global;
26use ndslice::Extent;
27use ndslice::Selection;
28use ndslice::Shape;
29use pyo3::prelude::*;
30use pyo3::types::PyDict;
31use pyo3::types::PyTuple;
32use serde_multipart::Part;
33use typeuri::Named;
34
35use crate::actor::MethodSpecifier;
36use crate::actor::PythonActor;
37use crate::actor::PythonMessage;
38use crate::actor::PythonMessageKind;
39use crate::actor::PythonResponseMessage;
40use crate::actor_mesh::PythonActorMesh;
41use crate::actor_mesh::SupervisableActorMesh;
42use crate::actor_mesh::to_hy_sel;
43use crate::buffers::FrozenBuffer;
44use crate::context::PyInstance;
45use crate::mailbox::EitherPortRef;
46use crate::mailbox::PythonOncePortRef;
47use crate::mailbox::PythonPortRef;
48use crate::metrics::ENDPOINT_BROADCAST_ERROR;
49use crate::metrics::ENDPOINT_BROADCAST_THROUGHPUT;
50use crate::metrics::ENDPOINT_CALL_ERROR;
51use crate::metrics::ENDPOINT_CALL_LATENCY_US_HISTOGRAM;
52use crate::metrics::ENDPOINT_CALL_ONE_ERROR;
53use crate::metrics::ENDPOINT_CALL_ONE_LATENCY_US_HISTOGRAM;
54use crate::metrics::ENDPOINT_CALL_ONE_THROUGHPUT;
55use crate::metrics::ENDPOINT_CALL_THROUGHPUT;
56use crate::metrics::ENDPOINT_CHOOSE_ERROR;
57use crate::metrics::ENDPOINT_CHOOSE_LATENCY_US_HISTOGRAM;
58use crate::metrics::ENDPOINT_CHOOSE_THROUGHPUT;
59use crate::metrics::ENDPOINT_STREAM_ERROR;
60use crate::metrics::ENDPOINT_STREAM_LATENCY_US_HISTOGRAM;
61use crate::metrics::ENDPOINT_STREAM_THROUGHPUT;
62use crate::pickle::PendingMessage;
63use crate::pickle::unpickle;
64use crate::pytokio::PyPythonTask;
65use crate::pytokio::PythonTask;
66use crate::shape::PyExtent;
67use crate::shape::PyShape;
68use crate::supervision::Supervisable;
69use crate::supervision::SupervisionError;
70use crate::value_mesh::PyValueMesh;
71
72py_global!(get_context, "monarch._src.actor.actor_mesh", "context");
73py_global!(
74    create_endpoint_message,
75    "monarch._src.actor.actor_mesh",
76    "_create_endpoint_message"
77);
78py_global!(
79    dispatch_actor_rref,
80    "monarch._src.actor.actor_mesh",
81    "_dispatch_actor_rref"
82);
83py_global!(make_future, "monarch._src.actor.future", "Future");
84
85fn unpickle_from_part(py: Python<'_>, part: Part) -> PyResult<Bound<'_, PyAny>> {
86    unpickle(
87        py,
88        FrozenBuffer {
89            inner: part.into_bytes(),
90        },
91    )
92}
93
94/// The type of endpoint operation being performed.
95///
96/// Used to select the appropriate telemetry metrics for each operation type.
97#[derive(Clone, Copy, Debug)]
98pub(crate) enum EndpointAdverb {
99    Call,
100    CallOne,
101    Choose,
102    Stream,
103}
104
105impl EndpointAdverb {
106    fn as_str(self) -> &'static str {
107        match self {
108            Self::Call => "call",
109            Self::CallOne => "call_one",
110            Self::Choose => "choose",
111            Self::Stream => "stream",
112        }
113    }
114}
115
116/// RAII guard for recording endpoint call telemetry.
117///
118/// Records latency on drop, similar to Python's `@_with_telemetry` decorator.
119/// Call `mark_error()` before dropping to also record an error.
120pub struct RecordEndpointGuard {
121    start: tokio::time::Instant,
122    method_name: String,
123    actor_count: usize,
124    adverb: EndpointAdverb,
125    error_occurred: Cell<bool>,
126}
127
128impl RecordEndpointGuard {
129    fn new(
130        start: tokio::time::Instant,
131        method_name: String,
132        actor_count: usize,
133        adverb: EndpointAdverb,
134    ) -> Self {
135        let attributes = hyperactor_telemetry::kv_pairs!(
136            "method" => method_name.clone()
137        );
138        match adverb {
139            EndpointAdverb::Call => {
140                ENDPOINT_CALL_THROUGHPUT.add(1, attributes);
141            }
142            EndpointAdverb::CallOne => {
143                ENDPOINT_CALL_ONE_THROUGHPUT.add(1, attributes);
144            }
145            EndpointAdverb::Choose => {
146                ENDPOINT_CHOOSE_THROUGHPUT.add(1, attributes);
147            }
148            EndpointAdverb::Stream => {
149                // Throughput already recorded once at stream creation in py_stream_collector
150            }
151        }
152
153        Self {
154            start,
155            method_name,
156            actor_count,
157            adverb,
158            error_occurred: Cell::new(false),
159        }
160    }
161
162    fn mark_error(&self) {
163        self.error_occurred.set(true);
164    }
165}
166
167impl Drop for RecordEndpointGuard {
168    fn drop(&mut self) {
169        let actor_count_str = self.actor_count.to_string();
170        let attributes = hyperactor_telemetry::kv_pairs!(
171            "method" => self.method_name.clone(),
172            "actor_count" => actor_count_str
173        );
174
175        let duration_us = self.start.elapsed().as_micros();
176
177        match self.adverb {
178            EndpointAdverb::Call => {
179                ENDPOINT_CALL_LATENCY_US_HISTOGRAM.record(duration_us as f64, attributes);
180            }
181            EndpointAdverb::CallOne => {
182                ENDPOINT_CALL_ONE_LATENCY_US_HISTOGRAM.record(duration_us as f64, attributes);
183            }
184            EndpointAdverb::Choose => {
185                ENDPOINT_CHOOSE_LATENCY_US_HISTOGRAM.record(duration_us as f64, attributes);
186            }
187            EndpointAdverb::Stream => {
188                ENDPOINT_STREAM_LATENCY_US_HISTOGRAM.record(duration_us as f64, attributes);
189            }
190        }
191
192        if self.error_occurred.get() {
193            match self.adverb {
194                EndpointAdverb::Call => {
195                    ENDPOINT_CALL_ERROR.add(1, attributes);
196                }
197                EndpointAdverb::CallOne => {
198                    ENDPOINT_CALL_ONE_ERROR.add(1, attributes);
199                }
200                EndpointAdverb::Choose => {
201                    ENDPOINT_CHOOSE_ERROR.add(1, attributes);
202                }
203                EndpointAdverb::Stream => {
204                    ENDPOINT_STREAM_ERROR.add(1, attributes);
205                }
206            }
207        }
208    }
209}
210
211/// Send-safe RAII guard for an OTEL-style endpoint span.
212///
213/// We only need endpoint spans for telemetry slices, not for `tracing` context
214/// propagation. So this guard emits synthetic trace events directly into the
215/// unified telemetry dispatcher instead of holding a real `tracing::Span`
216/// across `.await` points.
217pub(crate) struct SpanGuard {
218    id: u64,
219}
220
221impl SpanGuard {
222    fn actor_endpoint(name: &'static str, actor_id: &ActorAddr, mesh: &str, method: &str) -> Self {
223        Self {
224            id: hyperactor_telemetry::start_user_span(
225                name,
226                hyperactor_telemetry::sinks::perfetto::ENDPOINT_TELEMETRY_TARGET,
227                [
228                    (
229                        "actor_id",
230                        hyperactor_telemetry::trace_dispatcher::FieldValue::Str(
231                            actor_id.to_string(),
232                        ),
233                    ),
234                    (
235                        "mesh",
236                        hyperactor_telemetry::trace_dispatcher::FieldValue::Str(mesh.to_string()),
237                    ),
238                    (
239                        "method",
240                        hyperactor_telemetry::trace_dispatcher::FieldValue::Str(method.to_string()),
241                    ),
242                ],
243            ),
244        }
245    }
246
247    fn remote(name: &'static str, actor_id: &ActorAddr, call_name: &str) -> Self {
248        Self {
249            id: hyperactor_telemetry::start_user_span(
250                name,
251                hyperactor_telemetry::sinks::perfetto::ENDPOINT_TELEMETRY_TARGET,
252                [
253                    (
254                        "actor_id",
255                        hyperactor_telemetry::trace_dispatcher::FieldValue::Str(
256                            actor_id.to_string(),
257                        ),
258                    ),
259                    (
260                        "call_name",
261                        hyperactor_telemetry::trace_dispatcher::FieldValue::Str(
262                            call_name.to_string(),
263                        ),
264                    ),
265                ],
266            ),
267        }
268    }
269}
270
271impl Drop for SpanGuard {
272    fn drop(&mut self) {
273        hyperactor_telemetry::end_user_span(self.id);
274    }
275}
276
277fn supervision_error_to_pyerr(err: PyErr, qualified_endpoint_name: &Option<String>) -> PyErr {
278    match qualified_endpoint_name {
279        Some(endpoint) => {
280            Python::attach(|py| SupervisionError::set_endpoint_on_err(py, err, endpoint.clone()))
281        }
282        None => err,
283    }
284}
285
286async fn collect_value(
287    rx: &mut PortReceiver<PythonMessage>,
288    supervision_monitor: &Option<Arc<dyn Supervisable>>,
289    instance: &Instance<PythonActor>,
290    qualified_endpoint_name: &Option<String>,
291) -> PyResult<(Part, Option<usize>)> {
292    enum RaceResult {
293        Message(Box<PythonMessage>),
294        SupervisionError(PyErr),
295        RecvError(String),
296    }
297
298    let race_result = match supervision_monitor {
299        Some(sup) => {
300            tokio::select! {
301                biased;
302                result = sup.supervision_event(instance) => {
303                    match result {
304                        Some(err) => RaceResult::SupervisionError(err),
305                        None => {
306                            match rx.recv().await {
307                                Ok(msg) => RaceResult::Message(Box::new(msg)),
308                                Err(e) => RaceResult::RecvError(e.to_string()),
309                            }
310                        }
311                    }
312                }
313                msg = rx.recv() => {
314                    match msg {
315                        Ok(m) => RaceResult::Message(Box::new(m)),
316                        Err(e) => RaceResult::RecvError(e.to_string()),
317                    }
318                }
319            }
320        }
321        _ => match rx.recv().await {
322            Ok(msg) => RaceResult::Message(Box::new(msg)),
323            Err(e) => RaceResult::RecvError(e.to_string()),
324        },
325    };
326
327    match race_result {
328        RaceResult::Message(boxed) => {
329            let PythonMessage { kind, message, .. } = *boxed;
330            match kind {
331                PythonMessageKind::Result { rank, .. } => Ok((message, rank)),
332                PythonMessageKind::Exception { .. } => {
333                    Python::attach(|py| Err(PyErr::from_value(unpickle_from_part(py, message)?)))
334                }
335                other => Err(pyo3::exceptions::PyValueError::new_err(format!(
336                    "unexpected message kind {:?}",
337                    other
338                ))),
339            }
340        }
341        RaceResult::RecvError(e) => Err(pyo3::exceptions::PyEOFError::new_err(format!(
342            "Port closed: {}",
343            e
344        ))),
345        RaceResult::SupervisionError(err) => {
346            Err(supervision_error_to_pyerr(err, qualified_endpoint_name))
347        }
348    }
349}
350
351async fn collect_valuemesh(
352    extent: Extent,
353    rx: OncePortReceiver<PythonMessage>,
354    method_name: String,
355    supervision_monitor: Option<Arc<dyn Supervisable>>,
356    instance: &Instance<PythonActor>,
357    qualified_endpoint_name: Option<String>,
358) -> PyResult<Py<PyAny>> {
359    let start = tokio::time::Instant::now();
360
361    let expected_count = extent.num_ranks();
362
363    let record_guard = RecordEndpointGuard::new(
364        start,
365        method_name.clone(),
366        expected_count,
367        EndpointAdverb::Call,
368    );
369
370    enum RaceResult {
371        Collected(Box<PythonMessage>),
372        SupervisionError(PyErr),
373        RecvError(String),
374    }
375
376    let race_result = match &supervision_monitor {
377        Some(sup) => {
378            tokio::select! {
379                biased;
380                result = sup.supervision_event(instance) => {
381                    match result {
382                        Some(err) => RaceResult::SupervisionError(err),
383                        None => RaceResult::RecvError(
384                            "supervision monitor closed unexpectedly".to_string()
385                        ),
386                    }
387                }
388                batch = rx.recv() => {
389                    match batch {
390                        Ok(b) => RaceResult::Collected(Box::new(b)),
391                        Err(e) => RaceResult::RecvError(e.to_string()),
392                    }
393                }
394            }
395        }
396        None => match rx.recv().await {
397            Ok(batch) => RaceResult::Collected(Box::new(batch)),
398            Err(e) => RaceResult::RecvError(e.to_string()),
399        },
400    };
401
402    match race_result {
403        RaceResult::Collected(boxed) => {
404            let msg = *boxed;
405            let overlay = msg.into_overlay().map_err(|e| {
406                pyo3::exceptions::PyRuntimeError::new_err(format!(
407                    "failed to extract overlay from collected responses: {e}"
408                ))
409            })?;
410            Python::attach(|py| {
411                Ok(PyValueMesh::build_from_parts(
412                    &extent,
413                    overlay.runs().try_fold(
414                        Vec::with_capacity(expected_count),
415                        |mut parts, (range, payload)| match payload {
416                            PythonResponseMessage::Result(part) => {
417                                parts.extend(range.clone().map(|_| part.clone()));
418                                Ok(parts)
419                            }
420                            PythonResponseMessage::Exception(part) => {
421                                record_guard.mark_error();
422                                Python::attach(|py| {
423                                    Err(PyErr::from_value(unpickle_from_part(py, part.clone())?))
424                                })
425                            }
426                        },
427                    )?,
428                )?
429                .into_pyobject(py)?
430                .into_any()
431                .unbind())
432            })
433        }
434        RaceResult::RecvError(e) => {
435            record_guard.mark_error();
436            Err(pyo3::exceptions::PyEOFError::new_err(format!(
437                "Port closed: {}",
438                e
439            )))
440        }
441        RaceResult::SupervisionError(err) => {
442            record_guard.mark_error();
443            Err(supervision_error_to_pyerr(err, &qualified_endpoint_name))
444        }
445    }
446}
447
448fn value_collector(
449    mut receiver: PortReceiver<PythonMessage>,
450    method_name: String,
451    supervision_monitor: Option<Arc<dyn Supervisable>>,
452    instance: Instance<PythonActor>,
453    qualified_endpoint_name: Option<String>,
454    adverb: EndpointAdverb,
455    span_guard: SpanGuard,
456) -> PyResult<PyPythonTask> {
457    Ok(PythonTask::new(async move {
458        let _span_guard = span_guard;
459        let start = tokio::time::Instant::now();
460
461        let record_guard = RecordEndpointGuard::new(start, method_name, 1, adverb);
462
463        match collect_value(
464            &mut receiver,
465            &supervision_monitor,
466            &instance,
467            &qualified_endpoint_name,
468        )
469        .await
470        {
471            Ok((message, _)) => {
472                Python::attach(|py| unpickle_from_part(py, message).map(|obj| obj.unbind()))
473            }
474            Err(e) => {
475                record_guard.mark_error();
476                Err(e)
477            }
478        }
479    })?
480    .into())
481}
482
483/// A streaming iterator that yields futures for each response from actors.
484///
485/// Implements Python's iterator protocol (`__iter__`/`__next__`) to yield
486/// `Future` objects that resolve to individual actor responses.
487#[pyclass(
488    name = "ValueStream",
489    module = "monarch._rust_bindings.monarch_hyperactor.endpoint"
490)]
491pub struct PyValueStream {
492    receiver: Arc<tokio::sync::Mutex<PortReceiver<PythonMessage>>>,
493    /// Supervisor for monitoring actor health during streaming.
494    supervision_monitor: Option<Arc<dyn Supervisable>>,
495    instance: Instance<PythonActor>,
496    remaining: AtomicUsize,
497    method_name: String,
498    qualified_endpoint_name: Option<String>,
499    start: tokio::time::Instant,
500    actor_count: usize,
501    future_class: Py<PyAny>,
502}
503
504#[pymethods]
505impl PyValueStream {
506    fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
507        slf
508    }
509
510    fn __next__(&self, py: Python<'_>) -> PyResult<Option<Py<PyAny>>> {
511        let remaining = self.remaining.load(Ordering::Relaxed);
512        if remaining == 0 {
513            return Ok(None);
514        }
515        self.remaining.store(remaining - 1, Ordering::Relaxed);
516
517        let receiver = self.receiver.clone();
518        let supervision_monitor = self.supervision_monitor.clone();
519        let instance = self.instance.clone_for_py();
520        let qualified_endpoint_name = self.qualified_endpoint_name.clone();
521        let start = self.start;
522        let method_name = self.method_name.clone();
523        let actor_count = self.actor_count;
524
525        let task: PyPythonTask = PythonTask::new(async move {
526            let record_guard =
527                RecordEndpointGuard::new(start, method_name, actor_count, EndpointAdverb::Stream);
528
529            let mut rx_guard = receiver.lock().await;
530
531            match collect_value(
532                &mut rx_guard,
533                &supervision_monitor,
534                &instance,
535                &qualified_endpoint_name,
536            )
537            .await
538            {
539                Ok((message, _)) => {
540                    Python::attach(|py| unpickle_from_part(py, message).map(|obj| obj.unbind()))
541                }
542                Err(e) => {
543                    record_guard.mark_error();
544                    Err(e)
545                }
546            }
547        })?
548        .into();
549
550        let kwargs = PyDict::new(py);
551        kwargs.set_item("coro", task)?;
552        let future = self.future_class.call(py, (), Some(&kwargs))?;
553        Ok(Some(future))
554    }
555}
556
557fn wrap_in_future(py: Python<'_>, task: PyPythonTask) -> PyResult<Py<PyAny>> {
558    let kwargs = PyDict::new(py);
559    kwargs.set_item("coro", task)?;
560    let future = make_future(py).call((), Some(&kwargs))?;
561    Ok(future.unbind())
562}
563
564/// Trait that defines the core operations an endpoint must provide.
565/// Both ActorEndpoint and RemoteEndpoint implement this trait.
566pub(crate) trait Endpoint {
567    /// Get the extent of the endpoint's targets.
568    fn get_extent(&self, py: Python<'_>) -> PyResult<Extent>;
569
570    /// Get the method name for this endpoint.
571    fn get_method_name(&self) -> &str;
572
573    /// Create and send a message with the given args/kwargs.
574    fn send_message<'py>(
575        &self,
576        py: Python<'py>,
577        args: &Bound<'py, PyTuple>,
578        kwargs: Option<&Bound<'py, PyDict>>,
579        port_ref: Option<EitherPortRef>,
580        selection: Selection,
581        instance: &Instance<PythonActor>,
582    ) -> PyResult<()>;
583
584    /// Like `send_message` but stamps `caller_headers` onto the
585    /// outgoing request envelope. Implementations that can carry
586    /// headers override this; the default delegates to `send_message`
587    /// and drops them.
588    fn send_message_with_headers<'py>(
589        &self,
590        py: Python<'py>,
591        args: &Bound<'py, PyTuple>,
592        kwargs: Option<&Bound<'py, PyDict>>,
593        port_ref: Option<EitherPortRef>,
594        selection: Selection,
595        instance: &Instance<PythonActor>,
596        _caller_headers: hyperactor_config::Flattrs,
597    ) -> PyResult<()> {
598        self.send_message(py, args, kwargs, port_ref, selection, instance)
599    }
600
601    /// Build the operation-context envelope headers to stamp on an
602    /// outgoing request for this endpoint invocation. The result is
603    /// empty when the endpoint cannot supply a qualified name
604    /// (e.g. `RemoteEndpoint`'s `get_qualified_name` returns `None`),
605    /// in which case callers see the unchanged dispatch surface.
606    fn build_operation_context_headers(
607        &self,
608        adverb: EndpointAdverb,
609    ) -> hyperactor_config::Flattrs {
610        let adverb_str = match adverb {
611            EndpointAdverb::Call => "call",
612            EndpointAdverb::CallOne => "call_one",
613            EndpointAdverb::Choose => "choose",
614            EndpointAdverb::Stream => "stream",
615        };
616        let attrs = crate::operation_context::build_operation_context_attrs(
617            self.get_qualified_name(),
618            Some(adverb_str),
619        );
620        let mut headers = hyperactor_config::Flattrs::new();
621        crate::operation_context::stamp_operation_context(&mut headers, &attrs);
622        headers
623    }
624
625    /// Get the supervision_monitor for this endpoint (if any).
626    fn get_supervision_monitor(&self) -> Option<Arc<dyn Supervisable>>;
627
628    /// Get the qualified endpoint name for error messages (if any).
629    fn get_qualified_name(&self) -> Option<String>;
630
631    /// Open an OTEL-style span for an endpoint invocation. Each impl attaches
632    /// the fields the perfetto sink needs to synthesize the display name
633    /// (`{mesh}.{method}.{adverb}` for ActorEndpoint, `{call_name}.{adverb}`
634    /// for Remote) and route the slice to an actor-specific track. The adverb is the span name,
635    /// so no formatting happens at the call site and the sink formats only when
636    /// it renders the slice.
637    fn enter_endpoint_span(&self, adverb: EndpointAdverb, actor_id: &ActorAddr) -> SpanGuard;
638
639    fn get_current_instance(&self, py: Python<'_>) -> PyResult<Instance<PythonActor>> {
640        let context = get_context(py).call0()?;
641        let py_instance: PyRef<PyInstance> = context.getattr("actor_instance")?.extract()?;
642        Ok(py_instance.clone().into_instance())
643    }
644
645    fn open_response_port(
646        &self,
647        instance: &Instance<PythonActor>,
648    ) -> (PythonPortRef, PortReceiver<PythonMessage>) {
649        let (p, receiver) = instance.mailbox_for_py().open_port::<PythonMessage>();
650        (PythonPortRef { inner: p.bind() }, receiver)
651    }
652
653    fn open_reduce_response_port(
654        &self,
655        instance: &Instance<PythonActor>,
656    ) -> (PythonOncePortRef, OncePortReceiver<PythonMessage>) {
657        let (p, receiver) = instance
658            .mailbox_for_py()
659            .open_reduce_port(PythonResponseMessageAccumulator);
660        (PythonOncePortRef::from(p.bind()), receiver)
661    }
662
663    /// Call the endpoint on all actors and collect all responses into a ValueMesh.
664    fn call<'py>(
665        &self,
666        py: Python<'py>,
667        args: &Bound<'py, PyTuple>,
668        kwargs: Option<&Bound<'py, PyDict>>,
669    ) -> PyResult<Py<PyAny>> {
670        let instance = self.get_current_instance(py)?;
671        let span_guard = self.enter_endpoint_span(EndpointAdverb::Call, instance.self_addr());
672
673        let extent = self.get_extent(py)?;
674        let method_name = self.get_method_name().to_string();
675        let (port_ref, receiver) = self.open_reduce_response_port(&instance);
676
677        let supervision_monitor = self.get_supervision_monitor();
678        let qualified_endpoint_name = self.get_qualified_name();
679
680        let caller_headers = self.build_operation_context_headers(EndpointAdverb::Call);
681        self.send_message_with_headers(
682            py,
683            args,
684            kwargs,
685            Some(EitherPortRef::Once(port_ref)),
686            sel!(*),
687            &instance,
688            caller_headers,
689        )?;
690
691        let instance_for_task = instance.clone_for_py();
692        let task: PyPythonTask = PythonTask::new(async move {
693            let _span_guard = span_guard;
694            collect_valuemesh(
695                extent,
696                receiver,
697                method_name,
698                supervision_monitor,
699                &instance_for_task,
700                qualified_endpoint_name,
701            )
702            .await
703        })?
704        .into();
705
706        wrap_in_future(py, task)
707    }
708
709    /// Load balanced sends a message to one chosen actor and awaits a result.
710    fn choose<'py>(
711        &self,
712        py: Python<'py>,
713        args: &Bound<'py, PyTuple>,
714        kwargs: Option<&Bound<'py, PyDict>>,
715    ) -> PyResult<Py<PyAny>> {
716        let instance = self.get_current_instance(py)?;
717        let span_guard = self.enter_endpoint_span(EndpointAdverb::Choose, instance.self_addr());
718        let (port_ref, receiver) = self.open_response_port(&instance);
719
720        let caller_headers = self.build_operation_context_headers(EndpointAdverb::Choose);
721        self.send_message_with_headers(
722            py,
723            args,
724            kwargs,
725            Some(EitherPortRef::Unbounded(port_ref)),
726            sel!(?),
727            &instance,
728            caller_headers,
729        )?;
730
731        let task = value_collector(
732            receiver,
733            self.get_method_name().to_string(),
734            self.get_supervision_monitor(),
735            instance.clone_for_py(),
736            self.get_qualified_name(),
737            EndpointAdverb::Choose,
738            span_guard,
739        )?;
740
741        wrap_in_future(py, task)
742    }
743
744    /// Call the endpoint on exactly one actor (the mesh must have exactly one actor).
745    fn call_one<'py>(
746        &self,
747        py: Python<'py>,
748        args: &Bound<'py, PyTuple>,
749        kwargs: Option<&Bound<'py, PyDict>>,
750    ) -> PyResult<Py<PyAny>> {
751        let extent = self.get_extent(py)?;
752
753        if extent.num_ranks() != 1 {
754            return Err(pyo3::exceptions::PyValueError::new_err(format!(
755                "call_one requires exactly 1 actor, but mesh has {}",
756                extent.num_ranks()
757            )));
758        }
759
760        let instance = self.get_current_instance(py)?;
761        let span_guard = self.enter_endpoint_span(EndpointAdverb::CallOne, instance.self_addr());
762        let (port_ref, receiver) = self.open_response_port(&instance);
763
764        let caller_headers = self.build_operation_context_headers(EndpointAdverb::CallOne);
765        self.send_message_with_headers(
766            py,
767            args,
768            kwargs,
769            Some(EitherPortRef::Unbounded(port_ref)),
770            sel!(*),
771            &instance,
772            caller_headers,
773        )?;
774
775        let task = value_collector(
776            receiver,
777            self.get_method_name().to_string(),
778            self.get_supervision_monitor(),
779            instance.clone_for_py(),
780            self.get_qualified_name(),
781            EndpointAdverb::CallOne,
782            span_guard,
783        )?;
784
785        wrap_in_future(py, task)
786    }
787
788    /// Call the endpoint on all actors and return an iterator of Futures.
789    fn stream<'py>(
790        &self,
791        py: Python<'py>,
792        args: &Bound<'py, PyTuple>,
793        kwargs: Option<&Bound<'py, PyDict>>,
794    ) -> PyResult<Py<PyAny>> {
795        let extent = self.get_extent(py)?;
796        let method_name = self.get_method_name().to_string();
797
798        let instance = self.get_current_instance(py)?;
799        let (port_ref, receiver) = self.open_response_port(&instance);
800
801        let caller_headers = self.build_operation_context_headers(EndpointAdverb::Stream);
802        self.send_message_with_headers(
803            py,
804            args,
805            kwargs,
806            Some(EitherPortRef::Unbounded(port_ref)),
807            sel!(*),
808            &instance,
809            caller_headers,
810        )?;
811
812        let actor_count = extent.num_ranks();
813        let start = tokio::time::Instant::now();
814        let supervision_monitor = self.get_supervision_monitor();
815        let qualified_endpoint_name = self.get_qualified_name();
816        let future_class = make_future(py).unbind();
817
818        let attributes = hyperactor_telemetry::kv_pairs!(
819            "method" => method_name.clone()
820        );
821        ENDPOINT_STREAM_THROUGHPUT.add(1, attributes);
822
823        let stream = PyValueStream {
824            receiver: Arc::new(tokio::sync::Mutex::new(receiver)),
825            supervision_monitor,
826            instance: instance.clone_for_py(),
827            remaining: AtomicUsize::new(actor_count),
828            method_name,
829            qualified_endpoint_name,
830            start,
831            actor_count,
832            future_class,
833        };
834
835        Ok(stream.into_pyobject(py)?.unbind().into())
836    }
837
838    /// Send a message to all actors without waiting for responses (fire-and-forget).
839    fn broadcast<'py>(
840        &self,
841        py: Python<'py>,
842        args: &Bound<'py, PyTuple>,
843        kwargs: Option<&Bound<'py, PyDict>>,
844    ) -> PyResult<()> {
845        let instance = self.get_current_instance(py)?;
846        let method_name = self.get_method_name();
847        let attributes = hyperactor_telemetry::kv_pairs!(
848            "method" => method_name.to_string()
849        );
850
851        match self.send_message(py, args, kwargs, None, sel!(*), &instance) {
852            Ok(()) => {
853                ENDPOINT_BROADCAST_THROUGHPUT.add(1, attributes);
854                Ok(())
855            }
856            Err(e) => {
857                ENDPOINT_BROADCAST_ERROR.add(1, attributes);
858                Err(e)
859            }
860        }
861    }
862}
863
864#[pyclass(
865    name = "ActorEndpoint",
866    module = "monarch._rust_bindings.monarch_hyperactor.endpoint"
867)]
868pub struct ActorEndpoint {
869    inner: Arc<dyn SupervisableActorMesh>,
870    shape: Shape,
871    method: MethodSpecifier,
872    mesh_name: String,
873    signature: Option<Py<PyAny>>,
874    proc_mesh: Option<Py<PyAny>>,
875    propagator: Option<Py<PyAny>>,
876}
877
878impl ActorEndpoint {
879    fn create_message<'py>(
880        &self,
881        py: Python<'py>,
882        args: &Bound<'py, PyTuple>,
883        kwargs: Option<&Bound<'py, PyDict>>,
884        port_ref: Option<EitherPortRef>,
885    ) -> PyResult<PendingMessage> {
886        let port_ref_py: Py<PyAny> = match port_ref {
887            Some(pr) => pr.clone().into_pyobject(py)?.unbind(),
888            None => py.None(),
889        };
890
891        let result = create_endpoint_message(py).call1((
892            self.method.clone(),
893            self.signature
894                .as_ref()
895                .map_or_else(|| py.None(), |s| s.clone_ref(py)),
896            args,
897            kwargs
898                .map_or_else(|| PyDict::new(py), |d| d.clone())
899                .into_any(),
900            port_ref_py,
901            self.proc_mesh
902                .as_ref()
903                .map_or_else(|| py.None(), |p| p.clone_ref(py)),
904        ))?;
905        let mut pending: PyRefMut<'_, PendingMessage> = result.extract()?;
906        pending.take()
907    }
908}
909
910impl Endpoint for ActorEndpoint {
911    fn get_extent(&self, _py: Python<'_>) -> PyResult<Extent> {
912        Ok(self.shape.extent())
913    }
914
915    fn get_method_name(&self) -> &str {
916        self.method.name()
917    }
918
919    fn send_message<'py>(
920        &self,
921        py: Python<'py>,
922        args: &Bound<'py, PyTuple>,
923        kwargs: Option<&Bound<'py, PyDict>>,
924        port_ref: Option<EitherPortRef>,
925        selection: Selection,
926        instance: &Instance<PythonActor>,
927    ) -> PyResult<()> {
928        let message = self.create_message(py, args, kwargs, port_ref)?;
929        self.inner.cast_unresolved(message, selection, instance)
930    }
931
932    fn send_message_with_headers<'py>(
933        &self,
934        py: Python<'py>,
935        args: &Bound<'py, PyTuple>,
936        kwargs: Option<&Bound<'py, PyDict>>,
937        port_ref: Option<EitherPortRef>,
938        selection: Selection,
939        instance: &Instance<PythonActor>,
940        caller_headers: hyperactor_config::Flattrs,
941    ) -> PyResult<()> {
942        let message = self.create_message(py, args, kwargs, port_ref)?;
943        self.inner
944            .cast_unresolved_with_headers(message, selection, instance, caller_headers)
945    }
946
947    fn get_supervision_monitor(&self) -> Option<Arc<dyn Supervisable>> {
948        Some(self.inner.clone())
949    }
950
951    fn get_qualified_name(&self) -> Option<String> {
952        Some(format!("{}.{}()", self.mesh_name, self.method.name()))
953    }
954
955    fn enter_endpoint_span(&self, adverb: EndpointAdverb, actor_id: &ActorAddr) -> SpanGuard {
956        let mesh = self.mesh_name.as_str();
957        let method = self.method.name();
958        SpanGuard::actor_endpoint(adverb.as_str(), actor_id, mesh, method)
959    }
960}
961
962#[pymethods]
963impl ActorEndpoint {
964    /// Create a new ActorEndpoint.
965    #[new]
966    #[pyo3(signature = (actor_mesh, method, shape, mesh_name, signature=None, proc_mesh=None, propagator=None))]
967    fn new(
968        actor_mesh: PythonActorMesh,
969        method: MethodSpecifier,
970        shape: PyShape,
971        mesh_name: String,
972        signature: Option<Py<PyAny>>,
973        proc_mesh: Option<Py<PyAny>>,
974        propagator: Option<Py<PyAny>>,
975    ) -> Self {
976        Self {
977            inner: actor_mesh.get_inner(),
978            shape: shape.get_inner().clone(),
979            method,
980            mesh_name,
981            signature,
982            proc_mesh,
983            propagator,
984        }
985    }
986
987    /// Get the method specifier (used by actor_rref for tensor dispatch).
988    #[getter]
989    fn _name(&self) -> MethodSpecifier {
990        self.method.clone()
991    }
992
993    /// Get the signature (used for argument checking in _dispatch_actor_rref).
994    #[getter]
995    fn _signature(&self, py: Python<'_>) -> Py<PyAny> {
996        self.signature
997            .clone()
998            .unwrap_or_else(|| py.None().into_any())
999    }
1000
1001    /// Get the actor mesh (used by actor_rref for sending messages).
1002    #[getter]
1003    fn _actor_mesh(&self) -> PythonActorMesh {
1004        PythonActorMesh::from_impl(self.inner.clone())
1005    }
1006
1007    /// Propagation method for tensor shape inference.
1008    /// Delegates to Python _do_propagate helper.
1009    fn _propagate<'py>(
1010        &self,
1011        py: Python<'py>,
1012        args: &Bound<'py, PyAny>,
1013        kwargs: &Bound<'py, PyAny>,
1014        fake_args: &Bound<'py, PyAny>,
1015        fake_kwargs: &Bound<'py, PyAny>,
1016    ) -> PyResult<Py<PyAny>> {
1017        let do_propagate = py
1018            .import("monarch._src.actor.endpoint")?
1019            .getattr("_do_propagate")?;
1020        let propagator = self
1021            .propagator
1022            .as_ref()
1023            .map(|p| p.clone_ref(py).into_bound(py))
1024            .unwrap_or_else(|| py.None().into_bound(py));
1025        let cache = PyDict::new(py);
1026        do_propagate
1027            .call1((&propagator, args, kwargs, fake_args, fake_kwargs, cache))?
1028            .extract()
1029    }
1030
1031    /// Propagation for fetch operations.
1032    /// Returns None if no propagator is provided, otherwise calls _propagate.
1033    fn _fetch_propagate<'py>(
1034        &self,
1035        py: Python<'py>,
1036        args: &Bound<'py, PyAny>,
1037        kwargs: &Bound<'py, PyAny>,
1038        fake_args: &Bound<'py, PyAny>,
1039        fake_kwargs: &Bound<'py, PyAny>,
1040    ) -> PyResult<Py<PyAny>> {
1041        if self.propagator.is_none() {
1042            return Ok(py.None());
1043        }
1044        self._propagate(py, args, kwargs, fake_args, fake_kwargs)
1045    }
1046
1047    /// Propagation for pipe operations.
1048    /// Requires an explicit callable propagator.
1049    fn _pipe_propagate<'py>(
1050        &self,
1051        py: Python<'py>,
1052        args: &Bound<'py, PyAny>,
1053        kwargs: &Bound<'py, PyAny>,
1054        fake_args: &Bound<'py, PyAny>,
1055        fake_kwargs: &Bound<'py, PyAny>,
1056    ) -> PyResult<Py<PyAny>> {
1057        // Check if propagator is callable
1058        let is_callable = self
1059            .propagator
1060            .as_ref()
1061            .map(|p| p.bind(py).is_callable())
1062            .unwrap_or(false);
1063        if !is_callable {
1064            return Err(pyo3::exceptions::PyValueError::new_err(
1065                "Must specify explicit callable for pipe",
1066            ));
1067        }
1068        self._propagate(py, args, kwargs, fake_args, fake_kwargs)
1069    }
1070
1071    /// Get the rref result by calling the Python dispatch helper.
1072    #[pyo3(signature = (*args, **kwargs))]
1073    fn rref<'py>(
1074        slf: PyRef<'py, Self>,
1075        py: Python<'py>,
1076        args: &Bound<'py, PyTuple>,
1077        kwargs: Option<&Bound<'py, PyDict>>,
1078    ) -> PyResult<Py<PyAny>> {
1079        let kwargs_dict = kwargs.map_or_else(|| PyDict::new(py), |d| d.clone());
1080
1081        // Call _dispatch_actor_rref(endpoint, args, kwargs)
1082        let result = dispatch_actor_rref(py).call1((slf.into_pyobject(py)?, args, kwargs_dict))?;
1083
1084        Ok(result.unbind())
1085    }
1086
1087    /// Call the endpoint on all actors and collect all responses into a ValueMesh.
1088    #[pyo3(signature = (*args, **kwargs), name = "call")]
1089    fn py_call<'py>(
1090        &self,
1091        py: Python<'py>,
1092        args: &Bound<'py, PyTuple>,
1093        kwargs: Option<&Bound<'py, PyDict>>,
1094    ) -> PyResult<Py<PyAny>> {
1095        self.call(py, args, kwargs)
1096    }
1097
1098    /// Load balanced sends a message to one chosen actor and awaits a result.
1099    #[pyo3(signature = (*args, **kwargs), name = "choose")]
1100    fn py_choose<'py>(
1101        &self,
1102        py: Python<'py>,
1103        args: &Bound<'py, PyTuple>,
1104        kwargs: Option<&Bound<'py, PyDict>>,
1105    ) -> PyResult<Py<PyAny>> {
1106        self.choose(py, args, kwargs)
1107    }
1108
1109    /// Call the endpoint on exactly one actor (the mesh must have exactly one actor).
1110    #[pyo3(signature = (*args, **kwargs), name = "call_one")]
1111    fn py_call_one<'py>(
1112        &self,
1113        py: Python<'py>,
1114        args: &Bound<'py, PyTuple>,
1115        kwargs: Option<&Bound<'py, PyDict>>,
1116    ) -> PyResult<Py<PyAny>> {
1117        self.call_one(py, args, kwargs)
1118    }
1119
1120    /// Call the endpoint on all actors and return an iterator of Futures.
1121    #[pyo3(signature = (*args, **kwargs), name = "stream")]
1122    fn py_stream<'py>(
1123        &self,
1124        py: Python<'py>,
1125        args: &Bound<'py, PyTuple>,
1126        kwargs: Option<&Bound<'py, PyDict>>,
1127    ) -> PyResult<Py<PyAny>> {
1128        self.stream(py, args, kwargs)
1129    }
1130
1131    /// Send a message to all actors without waiting for responses (fire-and-forget).
1132    #[pyo3(signature = (*args, **kwargs), name = "broadcast")]
1133    fn py_broadcast<'py>(
1134        &self,
1135        py: Python<'py>,
1136        args: &Bound<'py, PyTuple>,
1137        kwargs: Option<&Bound<'py, PyDict>>,
1138    ) -> PyResult<()> {
1139        self.broadcast(py, args, kwargs)
1140    }
1141
1142    /// Send a message with optional port for response (used by actor_mesh.send).
1143    fn _send<'py>(
1144        &self,
1145        py: Python<'py>,
1146        args: &Bound<'py, PyTuple>,
1147        kwargs: &Bound<'py, PyDict>,
1148        port: Option<EitherPortRef>,
1149        selection: &str,
1150    ) -> PyResult<()> {
1151        let instance = self.get_current_instance(py)?;
1152        let sel = to_hy_sel(selection)?;
1153        self.send_message(py, args, Some(kwargs), port, sel, &instance)
1154    }
1155}
1156
1157/// A Rust wrapper for Python's RemoteImpl endpoint.
1158///
1159/// This allows us to implement the adverb methods (call, choose, call_one, stream, broadcast)
1160/// in Rust while delegating the actual send logic to the Python RemoteImpl._send() method.
1161#[pyclass(
1162    name = "Remote",
1163    module = "monarch._rust_bindings.monarch_hyperactor.endpoint"
1164)]
1165pub struct Remote {
1166    /// The wrapped Python RemoteImpl object
1167    inner: Py<PyAny>,
1168}
1169
1170impl Endpoint for Remote {
1171    fn get_extent(&self, py: Python<'_>) -> PyResult<Extent> {
1172        let extent: PyExtent = self.inner.call_method0(py, "_get_extent")?.extract(py)?;
1173        Ok(extent.into())
1174    }
1175
1176    fn get_method_name(&self) -> &str {
1177        "unknown"
1178    }
1179
1180    fn send_message<'py>(
1181        &self,
1182        py: Python<'py>,
1183        args: &Bound<'py, PyTuple>,
1184        kwargs: Option<&Bound<'py, PyDict>>,
1185        port_ref: Option<EitherPortRef>,
1186        selection: Selection,
1187        _instance: &Instance<PythonActor>,
1188    ) -> PyResult<()> {
1189        let send_kwargs = PyDict::new(py);
1190        match port_ref {
1191            Some(pr) => send_kwargs.set_item("port", pr.clone())?,
1192            None => send_kwargs.set_item("port", py.None())?,
1193        }
1194
1195        let selection_str = match selection {
1196            Selection::All(inner) if matches!(*inner, Selection::True) => "all",
1197            Selection::Any(inner) if matches!(*inner, Selection::True) => "choose",
1198            _ => {
1199                panic!("only sel!(*) and sel!(?) should be provided as selection for send_message")
1200            }
1201        };
1202
1203        send_kwargs.set_item("selection", selection_str)?;
1204
1205        let kwargs_dict = kwargs.map_or_else(|| PyDict::new(py), |d| d.clone());
1206        self.inner
1207            .call_method(py, "_send", (args.clone(), kwargs_dict), Some(&send_kwargs))?;
1208
1209        Ok(())
1210    }
1211
1212    fn get_supervision_monitor(&self) -> Option<Arc<dyn Supervisable>> {
1213        None // Remote endpoints don't have supervision_monitors
1214    }
1215
1216    fn get_qualified_name(&self) -> Option<String> {
1217        None // Remote endpoints don't have qualified names
1218    }
1219
1220    fn enter_endpoint_span(&self, adverb: EndpointAdverb, actor_id: &ActorAddr) -> SpanGuard {
1221        let call_name = Python::attach(|py| {
1222            self.inner
1223                .call_method0(py, "_call_name")
1224                .ok()
1225                .and_then(|v| v.extract::<String>(py).ok())
1226        });
1227        let call_name = call_name.as_deref().unwrap_or("");
1228        SpanGuard::remote(adverb.as_str(), actor_id, call_name)
1229    }
1230}
1231
1232#[pymethods]
1233impl Remote {
1234    /// Create a new Remote wrapping a Python RemoteImpl object.
1235    #[new]
1236    fn new(remote: Py<PyAny>) -> Self {
1237        Self { inner: remote }
1238    }
1239
1240    /// Call the endpoint on all actors and collect all responses into a ValueMesh.
1241    #[pyo3(signature = (*args, **kwargs), name = "call")]
1242    fn py_call<'py>(
1243        &self,
1244        py: Python<'py>,
1245        args: &Bound<'py, PyTuple>,
1246        kwargs: Option<&Bound<'py, PyDict>>,
1247    ) -> PyResult<Py<PyAny>> {
1248        self.call(py, args, kwargs)
1249    }
1250
1251    /// Load balanced sends a message to one chosen actor and awaits a result.
1252    #[pyo3(signature = (*args, **kwargs), name = "choose")]
1253    fn py_choose<'py>(
1254        &self,
1255        py: Python<'py>,
1256        args: &Bound<'py, PyTuple>,
1257        kwargs: Option<&Bound<'py, PyDict>>,
1258    ) -> PyResult<Py<PyAny>> {
1259        self.choose(py, args, kwargs)
1260    }
1261
1262    /// Call the endpoint on exactly one actor (the mesh must have exactly one actor).
1263    #[pyo3(signature = (*args, **kwargs), name = "call_one")]
1264    fn py_call_one<'py>(
1265        &self,
1266        py: Python<'py>,
1267        args: &Bound<'py, PyTuple>,
1268        kwargs: Option<&Bound<'py, PyDict>>,
1269    ) -> PyResult<Py<PyAny>> {
1270        self.call_one(py, args, kwargs)
1271    }
1272
1273    /// Call the endpoint on all actors and return an iterator of Futures.
1274    #[pyo3(signature = (*args, **kwargs), name = "stream")]
1275    fn py_stream<'py>(
1276        &self,
1277        py: Python<'py>,
1278        args: &Bound<'py, PyTuple>,
1279        kwargs: Option<&Bound<'py, PyDict>>,
1280    ) -> PyResult<Py<PyAny>> {
1281        self.stream(py, args, kwargs)
1282    }
1283
1284    /// Send a message to all actors without waiting for responses (fire-and-forget).
1285    #[pyo3(signature = (*args, **kwargs), name = "broadcast")]
1286    fn py_broadcast<'py>(
1287        &self,
1288        py: Python<'py>,
1289        args: &Bound<'py, PyTuple>,
1290        kwargs: Option<&Bound<'py, PyDict>>,
1291    ) -> PyResult<()> {
1292        self.broadcast(py, args, kwargs)
1293    }
1294
1295    /// Get the rref result by calling the wrapped Remote's rref method.
1296    #[pyo3(signature = (*args, **kwargs))]
1297    fn rref<'py>(
1298        &self,
1299        py: Python<'py>,
1300        args: &Bound<'py, PyTuple>,
1301        kwargs: Option<&Bound<'py, PyDict>>,
1302    ) -> PyResult<Py<PyAny>> {
1303        let kwargs_dict = kwargs.map_or_else(|| PyDict::new(py), |d| d.clone());
1304        self.inner.call_method(py, "rref", args, Some(&kwargs_dict))
1305    }
1306
1307    /// Get the call name by delegating to the wrapped Remote's _call_name.
1308    fn _call_name(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
1309        self.inner.call_method0(py, "_call_name")
1310    }
1311
1312    /// Get the maybe_resolvable property from the wrapped RemoteImpl.
1313    #[getter]
1314    fn _maybe_resolvable(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
1315        self.inner.getattr(py, "_maybe_resolvable")
1316    }
1317
1318    /// Get the resolvable property from the wrapped RemoteImpl.
1319    #[getter]
1320    fn _resolvable(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
1321        self.inner.getattr(py, "_resolvable")
1322    }
1323
1324    /// Get the remote_impl from the wrapped RemoteImpl.
1325    /// This is needed for function_to_import_path() in function.py to work correctly.
1326    #[getter]
1327    fn _remote_impl(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
1328        self.inner.getattr(py, "_remote_impl")
1329    }
1330
1331    /// Propagation method for tensor shape inference.
1332    /// Delegates to the wrapped Remote's _propagate.
1333    fn _propagate<'py>(
1334        &self,
1335        py: Python<'py>,
1336        args: &Bound<'py, PyAny>,
1337        kwargs: &Bound<'py, PyAny>,
1338        fake_args: &Bound<'py, PyAny>,
1339        fake_kwargs: &Bound<'py, PyAny>,
1340    ) -> PyResult<Py<PyAny>> {
1341        self.inner
1342            .call_method1(py, "_propagate", (args, kwargs, fake_args, fake_kwargs))
1343    }
1344
1345    /// Propagation for fetch operations.
1346    /// Delegates to the wrapped Remote's _fetch_propagate.
1347    fn _fetch_propagate<'py>(
1348        &self,
1349        py: Python<'py>,
1350        args: &Bound<'py, PyAny>,
1351        kwargs: &Bound<'py, PyAny>,
1352        fake_args: &Bound<'py, PyAny>,
1353        fake_kwargs: &Bound<'py, PyAny>,
1354    ) -> PyResult<Py<PyAny>> {
1355        self.inner.call_method1(
1356            py,
1357            "_fetch_propagate",
1358            (args, kwargs, fake_args, fake_kwargs),
1359        )
1360    }
1361
1362    /// Propagation for pipe operations.
1363    /// Delegates to the wrapped Remote's _pipe_propagate.
1364    fn _pipe_propagate<'py>(
1365        &self,
1366        py: Python<'py>,
1367        args: &Bound<'py, PyAny>,
1368        kwargs: &Bound<'py, PyAny>,
1369        fake_args: &Bound<'py, PyAny>,
1370        fake_kwargs: &Bound<'py, PyAny>,
1371    ) -> PyResult<Py<PyAny>> {
1372        self.inner.call_method1(
1373            py,
1374            "_pipe_propagate",
1375            (args, kwargs, fake_args, fake_kwargs),
1376        )
1377    }
1378
1379    /// Send a message with optional port for response.
1380    /// Delegates to the wrapped RemoteImpl's _send.
1381    fn _send<'py>(
1382        &self,
1383        py: Python<'py>,
1384        args: &Bound<'py, PyTuple>,
1385        kwargs: &Bound<'py, PyDict>,
1386        port: Option<Py<PyAny>>,
1387        selection: &str,
1388    ) -> PyResult<()> {
1389        self.inner.call_method(
1390            py,
1391            "_send",
1392            (args, kwargs),
1393            Some(&{
1394                let d = PyDict::new(py);
1395                d.set_item("port", port.unwrap_or_else(|| py.None()))?;
1396                d.set_item("selection", selection)?;
1397                d
1398            }),
1399        )?;
1400        Ok(())
1401    }
1402
1403    /// Make RemoteEndpoint callable - delegates to rref() like Remote.__call__.
1404    #[pyo3(signature = (*args, **kwargs))]
1405    fn __call__<'py>(
1406        &self,
1407        py: Python<'py>,
1408        args: &Bound<'py, PyTuple>,
1409        kwargs: Option<&Bound<'py, PyDict>>,
1410    ) -> PyResult<Py<PyAny>> {
1411        self.rref(py, args, kwargs)
1412    }
1413}
1414
1415pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> {
1416    module.add_class::<PyValueStream>()?;
1417    module.add_class::<ActorEndpoint>()?;
1418    module.add_class::<Remote>()?;
1419
1420    Ok(())
1421}
1422
1423#[derive(Named)]
1424struct PythonResponseMessageReducer;
1425
1426impl CommReducer for PythonResponseMessageReducer {
1427    type Update = PythonMessage;
1428
1429    fn reduce(&self, left: Self::Update, right: Self::Update) -> anyhow::Result<Self::Update> {
1430        Ok(ValueOverlay::try_from_runs(rle::merge_value_runs(
1431            left.into_overlay()?.into_runs(),
1432            right.into_overlay()?.into_runs(),
1433        ))?
1434        .into())
1435    }
1436}
1437
1438inventory::submit! {
1439    ReducerFactory {
1440        typehash_f: <PythonResponseMessageReducer as Named>::typehash,
1441        builder_f: |_| Ok(Box::new(PythonResponseMessageReducer)),
1442    }
1443}
1444
1445struct PythonResponseMessageAccumulator;
1446
1447impl Accumulator for PythonResponseMessageAccumulator {
1448    type State = PythonMessage;
1449    type Update = PythonMessage;
1450
1451    fn accumulate(&self, state: &mut Self::State, update: Self::Update) -> anyhow::Result<()> {
1452        *state = ValueOverlay::try_from_runs(rle::merge_value_runs(
1453            std::mem::take(state).into_overlay()?.into_runs(),
1454            update.into_overlay()?.into_runs(),
1455        ))?
1456        .into();
1457
1458        Ok(())
1459    }
1460
1461    fn reducer_spec(&self) -> Option<ReducerSpec> {
1462        Some(ReducerSpec {
1463            typehash: <PythonResponseMessageReducer as Named>::typehash(),
1464            builder_params: None,
1465        })
1466    }
1467}
1468
1469#[cfg(test)]
1470mod tests {
1471    use hyperactor::ActorAddr;
1472    use hyperactor::mailbox::headers::OPERATION_ADVERB;
1473    use hyperactor::mailbox::headers::OPERATION_ENDPOINT;
1474
1475    use super::*;
1476
1477    /// Minimal `Endpoint` impl that only serves `get_qualified_name`.
1478    /// The default `build_operation_context_headers` consults no other
1479    /// method, so the rest are unreachable.
1480    struct TestEndpoint {
1481        qualified_name: Option<String>,
1482    }
1483
1484    impl Endpoint for TestEndpoint {
1485        fn get_extent(&self, _py: Python<'_>) -> PyResult<Extent> {
1486            unreachable!()
1487        }
1488        fn get_method_name(&self) -> &str {
1489            unreachable!()
1490        }
1491        fn send_message<'py>(
1492            &self,
1493            _py: Python<'py>,
1494            _args: &Bound<'py, PyTuple>,
1495            _kwargs: Option<&Bound<'py, PyDict>>,
1496            _port_ref: Option<EitherPortRef>,
1497            _selection: Selection,
1498            _instance: &Instance<PythonActor>,
1499        ) -> PyResult<()> {
1500            unreachable!()
1501        }
1502        fn get_supervision_monitor(&self) -> Option<Arc<dyn Supervisable>> {
1503            None
1504        }
1505        fn get_qualified_name(&self) -> Option<String> {
1506            self.qualified_name.clone()
1507        }
1508        fn enter_endpoint_span(&self, _adverb: EndpointAdverb, _actor_id: &ActorAddr) -> SpanGuard {
1509            unreachable!()
1510        }
1511    }
1512
1513    /// OC-1 request-side producer: each `EndpointAdverb` maps to the
1514    /// expected wire adverb string, and the qualified endpoint name
1515    /// from `get_qualified_name()` flows through to `OPERATION_ENDPOINT`.
1516    #[test]
1517    fn test_rc1_build_operation_context_headers_stamps_each_adverb() {
1518        let ep = TestEndpoint {
1519            qualified_name: Some("training.Philosopher.ping()".to_string()),
1520        };
1521        for (adverb, expected_adverb) in [
1522            (EndpointAdverb::Call, "call"),
1523            (EndpointAdverb::CallOne, "call_one"),
1524            (EndpointAdverb::Choose, "choose"),
1525            (EndpointAdverb::Stream, "stream"),
1526        ] {
1527            let headers = ep.build_operation_context_headers(adverb);
1528            assert_eq!(
1529                headers.get(OPERATION_ENDPOINT).as_deref(),
1530                Some("training.Philosopher.ping()"),
1531                "adverb {:?}: OPERATION_ENDPOINT",
1532                adverb,
1533            );
1534            assert_eq!(
1535                headers.get(OPERATION_ADVERB).as_deref(),
1536                Some(expected_adverb),
1537                "adverb {:?}: OPERATION_ADVERB",
1538                adverb,
1539            );
1540        }
1541    }
1542
1543    /// OC-1 request-side producer: when the endpoint has no
1544    /// qualified name (e.g. `Remote::get_qualified_name` returns
1545    /// `None`), `OPERATION_ENDPOINT` is omitted while `OPERATION_ADVERB`
1546    /// is still stamped.
1547    #[test]
1548    fn test_rc1_build_operation_context_headers_omits_endpoint_when_no_qualified_name() {
1549        let ep = TestEndpoint {
1550            qualified_name: None,
1551        };
1552        let headers = ep.build_operation_context_headers(EndpointAdverb::CallOne);
1553        assert!(
1554            headers.get(OPERATION_ENDPOINT).is_none(),
1555            "OPERATION_ENDPOINT must be absent when endpoint has no qualified name",
1556        );
1557        assert_eq!(
1558            headers.get(OPERATION_ADVERB).as_deref(),
1559            Some("call_one"),
1560            "OPERATION_ADVERB should still be stamped",
1561        );
1562    }
1563}