1use std::cell::Cell;
10use std::sync::Arc;
11use std::sync::atomic::AtomicUsize;
12use std::sync::atomic::Ordering;
13
14use hyperactor::ActorAddr;
15use hyperactor::Instance;
16use hyperactor::accum::Accumulator;
17use hyperactor::accum::CommReducer;
18use hyperactor::accum::ReducerFactory;
19use hyperactor::accum::ReducerSpec;
20use hyperactor::mailbox::OncePortReceiver;
21use hyperactor::mailbox::PortReceiver;
22use hyperactor_mesh::sel;
23use hyperactor_mesh::value_mesh::ValueOverlay;
24use hyperactor_mesh::value_mesh::rle;
25use monarch_types::py_global;
26use ndslice::Extent;
27use ndslice::Selection;
28use ndslice::Shape;
29use pyo3::prelude::*;
30use pyo3::types::PyDict;
31use pyo3::types::PyTuple;
32use serde_multipart::Part;
33use typeuri::Named;
34
35use crate::actor::MethodSpecifier;
36use crate::actor::PythonActor;
37use crate::actor::PythonMessage;
38use crate::actor::PythonMessageKind;
39use crate::actor::PythonResponseMessage;
40use crate::actor_mesh::PythonActorMesh;
41use crate::actor_mesh::SupervisableActorMesh;
42use crate::actor_mesh::to_hy_sel;
43use crate::buffers::FrozenBuffer;
44use crate::context::PyInstance;
45use crate::mailbox::EitherPortRef;
46use crate::mailbox::PythonOncePortRef;
47use crate::mailbox::PythonPortRef;
48use crate::metrics::ENDPOINT_BROADCAST_ERROR;
49use crate::metrics::ENDPOINT_BROADCAST_THROUGHPUT;
50use crate::metrics::ENDPOINT_CALL_ERROR;
51use crate::metrics::ENDPOINT_CALL_LATENCY_US_HISTOGRAM;
52use crate::metrics::ENDPOINT_CALL_ONE_ERROR;
53use crate::metrics::ENDPOINT_CALL_ONE_LATENCY_US_HISTOGRAM;
54use crate::metrics::ENDPOINT_CALL_ONE_THROUGHPUT;
55use crate::metrics::ENDPOINT_CALL_THROUGHPUT;
56use crate::metrics::ENDPOINT_CHOOSE_ERROR;
57use crate::metrics::ENDPOINT_CHOOSE_LATENCY_US_HISTOGRAM;
58use crate::metrics::ENDPOINT_CHOOSE_THROUGHPUT;
59use crate::metrics::ENDPOINT_STREAM_ERROR;
60use crate::metrics::ENDPOINT_STREAM_LATENCY_US_HISTOGRAM;
61use crate::metrics::ENDPOINT_STREAM_THROUGHPUT;
62use crate::pickle::PendingMessage;
63use crate::pickle::unpickle;
64use crate::pytokio::PyPythonTask;
65use crate::pytokio::PythonTask;
66use crate::shape::PyExtent;
67use crate::shape::PyShape;
68use crate::supervision::Supervisable;
69use crate::supervision::SupervisionError;
70use crate::value_mesh::PyValueMesh;
71
72py_global!(get_context, "monarch._src.actor.actor_mesh", "context");
73py_global!(
74 create_endpoint_message,
75 "monarch._src.actor.actor_mesh",
76 "_create_endpoint_message"
77);
78py_global!(
79 dispatch_actor_rref,
80 "monarch._src.actor.actor_mesh",
81 "_dispatch_actor_rref"
82);
83py_global!(make_future, "monarch._src.actor.future", "Future");
84
85fn unpickle_from_part(py: Python<'_>, part: Part) -> PyResult<Bound<'_, PyAny>> {
86 unpickle(
87 py,
88 FrozenBuffer {
89 inner: part.into_bytes(),
90 },
91 )
92}
93
94#[derive(Clone, Copy, Debug)]
98pub(crate) enum EndpointAdverb {
99 Call,
100 CallOne,
101 Choose,
102 Stream,
103}
104
105impl EndpointAdverb {
106 fn as_str(self) -> &'static str {
107 match self {
108 Self::Call => "call",
109 Self::CallOne => "call_one",
110 Self::Choose => "choose",
111 Self::Stream => "stream",
112 }
113 }
114}
115
116pub struct RecordEndpointGuard {
121 start: tokio::time::Instant,
122 method_name: String,
123 actor_count: usize,
124 adverb: EndpointAdverb,
125 error_occurred: Cell<bool>,
126}
127
128impl RecordEndpointGuard {
129 fn new(
130 start: tokio::time::Instant,
131 method_name: String,
132 actor_count: usize,
133 adverb: EndpointAdverb,
134 ) -> Self {
135 let attributes = hyperactor_telemetry::kv_pairs!(
136 "method" => method_name.clone()
137 );
138 match adverb {
139 EndpointAdverb::Call => {
140 ENDPOINT_CALL_THROUGHPUT.add(1, attributes);
141 }
142 EndpointAdverb::CallOne => {
143 ENDPOINT_CALL_ONE_THROUGHPUT.add(1, attributes);
144 }
145 EndpointAdverb::Choose => {
146 ENDPOINT_CHOOSE_THROUGHPUT.add(1, attributes);
147 }
148 EndpointAdverb::Stream => {
149 }
151 }
152
153 Self {
154 start,
155 method_name,
156 actor_count,
157 adverb,
158 error_occurred: Cell::new(false),
159 }
160 }
161
162 fn mark_error(&self) {
163 self.error_occurred.set(true);
164 }
165}
166
167impl Drop for RecordEndpointGuard {
168 fn drop(&mut self) {
169 let actor_count_str = self.actor_count.to_string();
170 let attributes = hyperactor_telemetry::kv_pairs!(
171 "method" => self.method_name.clone(),
172 "actor_count" => actor_count_str
173 );
174
175 let duration_us = self.start.elapsed().as_micros();
176
177 match self.adverb {
178 EndpointAdverb::Call => {
179 ENDPOINT_CALL_LATENCY_US_HISTOGRAM.record(duration_us as f64, attributes);
180 }
181 EndpointAdverb::CallOne => {
182 ENDPOINT_CALL_ONE_LATENCY_US_HISTOGRAM.record(duration_us as f64, attributes);
183 }
184 EndpointAdverb::Choose => {
185 ENDPOINT_CHOOSE_LATENCY_US_HISTOGRAM.record(duration_us as f64, attributes);
186 }
187 EndpointAdverb::Stream => {
188 ENDPOINT_STREAM_LATENCY_US_HISTOGRAM.record(duration_us as f64, attributes);
189 }
190 }
191
192 if self.error_occurred.get() {
193 match self.adverb {
194 EndpointAdverb::Call => {
195 ENDPOINT_CALL_ERROR.add(1, attributes);
196 }
197 EndpointAdverb::CallOne => {
198 ENDPOINT_CALL_ONE_ERROR.add(1, attributes);
199 }
200 EndpointAdverb::Choose => {
201 ENDPOINT_CHOOSE_ERROR.add(1, attributes);
202 }
203 EndpointAdverb::Stream => {
204 ENDPOINT_STREAM_ERROR.add(1, attributes);
205 }
206 }
207 }
208 }
209}
210
211pub(crate) struct SpanGuard {
218 id: u64,
219}
220
221impl SpanGuard {
222 fn actor_endpoint(name: &'static str, actor_id: &ActorAddr, mesh: &str, method: &str) -> Self {
223 Self {
224 id: hyperactor_telemetry::start_user_span(
225 name,
226 hyperactor_telemetry::sinks::perfetto::ENDPOINT_TELEMETRY_TARGET,
227 [
228 (
229 "actor_id",
230 hyperactor_telemetry::trace_dispatcher::FieldValue::Str(
231 actor_id.to_string(),
232 ),
233 ),
234 (
235 "mesh",
236 hyperactor_telemetry::trace_dispatcher::FieldValue::Str(mesh.to_string()),
237 ),
238 (
239 "method",
240 hyperactor_telemetry::trace_dispatcher::FieldValue::Str(method.to_string()),
241 ),
242 ],
243 ),
244 }
245 }
246
247 fn remote(name: &'static str, actor_id: &ActorAddr, call_name: &str) -> Self {
248 Self {
249 id: hyperactor_telemetry::start_user_span(
250 name,
251 hyperactor_telemetry::sinks::perfetto::ENDPOINT_TELEMETRY_TARGET,
252 [
253 (
254 "actor_id",
255 hyperactor_telemetry::trace_dispatcher::FieldValue::Str(
256 actor_id.to_string(),
257 ),
258 ),
259 (
260 "call_name",
261 hyperactor_telemetry::trace_dispatcher::FieldValue::Str(
262 call_name.to_string(),
263 ),
264 ),
265 ],
266 ),
267 }
268 }
269}
270
271impl Drop for SpanGuard {
272 fn drop(&mut self) {
273 hyperactor_telemetry::end_user_span(self.id);
274 }
275}
276
277fn supervision_error_to_pyerr(err: PyErr, qualified_endpoint_name: &Option<String>) -> PyErr {
278 match qualified_endpoint_name {
279 Some(endpoint) => {
280 Python::attach(|py| SupervisionError::set_endpoint_on_err(py, err, endpoint.clone()))
281 }
282 None => err,
283 }
284}
285
286async fn collect_value(
287 rx: &mut PortReceiver<PythonMessage>,
288 supervision_monitor: &Option<Arc<dyn Supervisable>>,
289 instance: &Instance<PythonActor>,
290 qualified_endpoint_name: &Option<String>,
291) -> PyResult<(Part, Option<usize>)> {
292 enum RaceResult {
293 Message(Box<PythonMessage>),
294 SupervisionError(PyErr),
295 RecvError(String),
296 }
297
298 let race_result = match supervision_monitor {
299 Some(sup) => {
300 tokio::select! {
301 biased;
302 result = sup.supervision_event(instance) => {
303 match result {
304 Some(err) => RaceResult::SupervisionError(err),
305 None => {
306 match rx.recv().await {
307 Ok(msg) => RaceResult::Message(Box::new(msg)),
308 Err(e) => RaceResult::RecvError(e.to_string()),
309 }
310 }
311 }
312 }
313 msg = rx.recv() => {
314 match msg {
315 Ok(m) => RaceResult::Message(Box::new(m)),
316 Err(e) => RaceResult::RecvError(e.to_string()),
317 }
318 }
319 }
320 }
321 _ => match rx.recv().await {
322 Ok(msg) => RaceResult::Message(Box::new(msg)),
323 Err(e) => RaceResult::RecvError(e.to_string()),
324 },
325 };
326
327 match race_result {
328 RaceResult::Message(boxed) => {
329 let PythonMessage { kind, message, .. } = *boxed;
330 match kind {
331 PythonMessageKind::Result { rank, .. } => Ok((message, rank)),
332 PythonMessageKind::Exception { .. } => {
333 Python::attach(|py| Err(PyErr::from_value(unpickle_from_part(py, message)?)))
334 }
335 other => Err(pyo3::exceptions::PyValueError::new_err(format!(
336 "unexpected message kind {:?}",
337 other
338 ))),
339 }
340 }
341 RaceResult::RecvError(e) => Err(pyo3::exceptions::PyEOFError::new_err(format!(
342 "Port closed: {}",
343 e
344 ))),
345 RaceResult::SupervisionError(err) => {
346 Err(supervision_error_to_pyerr(err, qualified_endpoint_name))
347 }
348 }
349}
350
351async fn collect_valuemesh(
352 extent: Extent,
353 rx: OncePortReceiver<PythonMessage>,
354 method_name: String,
355 supervision_monitor: Option<Arc<dyn Supervisable>>,
356 instance: &Instance<PythonActor>,
357 qualified_endpoint_name: Option<String>,
358) -> PyResult<Py<PyAny>> {
359 let start = tokio::time::Instant::now();
360
361 let expected_count = extent.num_ranks();
362
363 let record_guard = RecordEndpointGuard::new(
364 start,
365 method_name.clone(),
366 expected_count,
367 EndpointAdverb::Call,
368 );
369
370 enum RaceResult {
371 Collected(Box<PythonMessage>),
372 SupervisionError(PyErr),
373 RecvError(String),
374 }
375
376 let race_result = match &supervision_monitor {
377 Some(sup) => {
378 tokio::select! {
379 biased;
380 result = sup.supervision_event(instance) => {
381 match result {
382 Some(err) => RaceResult::SupervisionError(err),
383 None => RaceResult::RecvError(
384 "supervision monitor closed unexpectedly".to_string()
385 ),
386 }
387 }
388 batch = rx.recv() => {
389 match batch {
390 Ok(b) => RaceResult::Collected(Box::new(b)),
391 Err(e) => RaceResult::RecvError(e.to_string()),
392 }
393 }
394 }
395 }
396 None => match rx.recv().await {
397 Ok(batch) => RaceResult::Collected(Box::new(batch)),
398 Err(e) => RaceResult::RecvError(e.to_string()),
399 },
400 };
401
402 match race_result {
403 RaceResult::Collected(boxed) => {
404 let msg = *boxed;
405 let overlay = msg.into_overlay().map_err(|e| {
406 pyo3::exceptions::PyRuntimeError::new_err(format!(
407 "failed to extract overlay from collected responses: {e}"
408 ))
409 })?;
410 Python::attach(|py| {
411 Ok(PyValueMesh::build_from_parts(
412 &extent,
413 overlay.runs().try_fold(
414 Vec::with_capacity(expected_count),
415 |mut parts, (range, payload)| match payload {
416 PythonResponseMessage::Result(part) => {
417 parts.extend(range.clone().map(|_| part.clone()));
418 Ok(parts)
419 }
420 PythonResponseMessage::Exception(part) => {
421 record_guard.mark_error();
422 Python::attach(|py| {
423 Err(PyErr::from_value(unpickle_from_part(py, part.clone())?))
424 })
425 }
426 },
427 )?,
428 )?
429 .into_pyobject(py)?
430 .into_any()
431 .unbind())
432 })
433 }
434 RaceResult::RecvError(e) => {
435 record_guard.mark_error();
436 Err(pyo3::exceptions::PyEOFError::new_err(format!(
437 "Port closed: {}",
438 e
439 )))
440 }
441 RaceResult::SupervisionError(err) => {
442 record_guard.mark_error();
443 Err(supervision_error_to_pyerr(err, &qualified_endpoint_name))
444 }
445 }
446}
447
448fn value_collector(
449 mut receiver: PortReceiver<PythonMessage>,
450 method_name: String,
451 supervision_monitor: Option<Arc<dyn Supervisable>>,
452 instance: Instance<PythonActor>,
453 qualified_endpoint_name: Option<String>,
454 adverb: EndpointAdverb,
455 span_guard: SpanGuard,
456) -> PyResult<PyPythonTask> {
457 Ok(PythonTask::new(async move {
458 let _span_guard = span_guard;
459 let start = tokio::time::Instant::now();
460
461 let record_guard = RecordEndpointGuard::new(start, method_name, 1, adverb);
462
463 match collect_value(
464 &mut receiver,
465 &supervision_monitor,
466 &instance,
467 &qualified_endpoint_name,
468 )
469 .await
470 {
471 Ok((message, _)) => {
472 Python::attach(|py| unpickle_from_part(py, message).map(|obj| obj.unbind()))
473 }
474 Err(e) => {
475 record_guard.mark_error();
476 Err(e)
477 }
478 }
479 })?
480 .into())
481}
482
483#[pyclass(
488 name = "ValueStream",
489 module = "monarch._rust_bindings.monarch_hyperactor.endpoint"
490)]
491pub struct PyValueStream {
492 receiver: Arc<tokio::sync::Mutex<PortReceiver<PythonMessage>>>,
493 supervision_monitor: Option<Arc<dyn Supervisable>>,
495 instance: Instance<PythonActor>,
496 remaining: AtomicUsize,
497 method_name: String,
498 qualified_endpoint_name: Option<String>,
499 start: tokio::time::Instant,
500 actor_count: usize,
501 future_class: Py<PyAny>,
502}
503
504#[pymethods]
505impl PyValueStream {
506 fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
507 slf
508 }
509
510 fn __next__(&self, py: Python<'_>) -> PyResult<Option<Py<PyAny>>> {
511 let remaining = self.remaining.load(Ordering::Relaxed);
512 if remaining == 0 {
513 return Ok(None);
514 }
515 self.remaining.store(remaining - 1, Ordering::Relaxed);
516
517 let receiver = self.receiver.clone();
518 let supervision_monitor = self.supervision_monitor.clone();
519 let instance = self.instance.clone_for_py();
520 let qualified_endpoint_name = self.qualified_endpoint_name.clone();
521 let start = self.start;
522 let method_name = self.method_name.clone();
523 let actor_count = self.actor_count;
524
525 let task: PyPythonTask = PythonTask::new(async move {
526 let record_guard =
527 RecordEndpointGuard::new(start, method_name, actor_count, EndpointAdverb::Stream);
528
529 let mut rx_guard = receiver.lock().await;
530
531 match collect_value(
532 &mut rx_guard,
533 &supervision_monitor,
534 &instance,
535 &qualified_endpoint_name,
536 )
537 .await
538 {
539 Ok((message, _)) => {
540 Python::attach(|py| unpickle_from_part(py, message).map(|obj| obj.unbind()))
541 }
542 Err(e) => {
543 record_guard.mark_error();
544 Err(e)
545 }
546 }
547 })?
548 .into();
549
550 let kwargs = PyDict::new(py);
551 kwargs.set_item("coro", task)?;
552 let future = self.future_class.call(py, (), Some(&kwargs))?;
553 Ok(Some(future))
554 }
555}
556
557fn wrap_in_future(py: Python<'_>, task: PyPythonTask) -> PyResult<Py<PyAny>> {
558 let kwargs = PyDict::new(py);
559 kwargs.set_item("coro", task)?;
560 let future = make_future(py).call((), Some(&kwargs))?;
561 Ok(future.unbind())
562}
563
564pub(crate) trait Endpoint {
567 fn get_extent(&self, py: Python<'_>) -> PyResult<Extent>;
569
570 fn get_method_name(&self) -> &str;
572
573 fn send_message<'py>(
575 &self,
576 py: Python<'py>,
577 args: &Bound<'py, PyTuple>,
578 kwargs: Option<&Bound<'py, PyDict>>,
579 port_ref: Option<EitherPortRef>,
580 selection: Selection,
581 instance: &Instance<PythonActor>,
582 ) -> PyResult<()>;
583
584 fn send_message_with_headers<'py>(
589 &self,
590 py: Python<'py>,
591 args: &Bound<'py, PyTuple>,
592 kwargs: Option<&Bound<'py, PyDict>>,
593 port_ref: Option<EitherPortRef>,
594 selection: Selection,
595 instance: &Instance<PythonActor>,
596 _caller_headers: hyperactor_config::Flattrs,
597 ) -> PyResult<()> {
598 self.send_message(py, args, kwargs, port_ref, selection, instance)
599 }
600
601 fn build_operation_context_headers(
607 &self,
608 adverb: EndpointAdverb,
609 ) -> hyperactor_config::Flattrs {
610 let adverb_str = match adverb {
611 EndpointAdverb::Call => "call",
612 EndpointAdverb::CallOne => "call_one",
613 EndpointAdverb::Choose => "choose",
614 EndpointAdverb::Stream => "stream",
615 };
616 let attrs = crate::operation_context::build_operation_context_attrs(
617 self.get_qualified_name(),
618 Some(adverb_str),
619 );
620 let mut headers = hyperactor_config::Flattrs::new();
621 crate::operation_context::stamp_operation_context(&mut headers, &attrs);
622 headers
623 }
624
625 fn get_supervision_monitor(&self) -> Option<Arc<dyn Supervisable>>;
627
628 fn get_qualified_name(&self) -> Option<String>;
630
631 fn enter_endpoint_span(&self, adverb: EndpointAdverb, actor_id: &ActorAddr) -> SpanGuard;
638
639 fn get_current_instance(&self, py: Python<'_>) -> PyResult<Instance<PythonActor>> {
640 let context = get_context(py).call0()?;
641 let py_instance: PyRef<PyInstance> = context.getattr("actor_instance")?.extract()?;
642 Ok(py_instance.clone().into_instance())
643 }
644
645 fn open_response_port(
646 &self,
647 instance: &Instance<PythonActor>,
648 ) -> (PythonPortRef, PortReceiver<PythonMessage>) {
649 let (p, receiver) = instance.mailbox_for_py().open_port::<PythonMessage>();
650 (PythonPortRef { inner: p.bind() }, receiver)
651 }
652
653 fn open_reduce_response_port(
654 &self,
655 instance: &Instance<PythonActor>,
656 ) -> (PythonOncePortRef, OncePortReceiver<PythonMessage>) {
657 let (p, receiver) = instance
658 .mailbox_for_py()
659 .open_reduce_port(PythonResponseMessageAccumulator);
660 (PythonOncePortRef::from(p.bind()), receiver)
661 }
662
663 fn call<'py>(
665 &self,
666 py: Python<'py>,
667 args: &Bound<'py, PyTuple>,
668 kwargs: Option<&Bound<'py, PyDict>>,
669 ) -> PyResult<Py<PyAny>> {
670 let instance = self.get_current_instance(py)?;
671 let span_guard = self.enter_endpoint_span(EndpointAdverb::Call, instance.self_addr());
672
673 let extent = self.get_extent(py)?;
674 let method_name = self.get_method_name().to_string();
675 let (port_ref, receiver) = self.open_reduce_response_port(&instance);
676
677 let supervision_monitor = self.get_supervision_monitor();
678 let qualified_endpoint_name = self.get_qualified_name();
679
680 let caller_headers = self.build_operation_context_headers(EndpointAdverb::Call);
681 self.send_message_with_headers(
682 py,
683 args,
684 kwargs,
685 Some(EitherPortRef::Once(port_ref)),
686 sel!(*),
687 &instance,
688 caller_headers,
689 )?;
690
691 let instance_for_task = instance.clone_for_py();
692 let task: PyPythonTask = PythonTask::new(async move {
693 let _span_guard = span_guard;
694 collect_valuemesh(
695 extent,
696 receiver,
697 method_name,
698 supervision_monitor,
699 &instance_for_task,
700 qualified_endpoint_name,
701 )
702 .await
703 })?
704 .into();
705
706 wrap_in_future(py, task)
707 }
708
709 fn choose<'py>(
711 &self,
712 py: Python<'py>,
713 args: &Bound<'py, PyTuple>,
714 kwargs: Option<&Bound<'py, PyDict>>,
715 ) -> PyResult<Py<PyAny>> {
716 let instance = self.get_current_instance(py)?;
717 let span_guard = self.enter_endpoint_span(EndpointAdverb::Choose, instance.self_addr());
718 let (port_ref, receiver) = self.open_response_port(&instance);
719
720 let caller_headers = self.build_operation_context_headers(EndpointAdverb::Choose);
721 self.send_message_with_headers(
722 py,
723 args,
724 kwargs,
725 Some(EitherPortRef::Unbounded(port_ref)),
726 sel!(?),
727 &instance,
728 caller_headers,
729 )?;
730
731 let task = value_collector(
732 receiver,
733 self.get_method_name().to_string(),
734 self.get_supervision_monitor(),
735 instance.clone_for_py(),
736 self.get_qualified_name(),
737 EndpointAdverb::Choose,
738 span_guard,
739 )?;
740
741 wrap_in_future(py, task)
742 }
743
744 fn call_one<'py>(
746 &self,
747 py: Python<'py>,
748 args: &Bound<'py, PyTuple>,
749 kwargs: Option<&Bound<'py, PyDict>>,
750 ) -> PyResult<Py<PyAny>> {
751 let extent = self.get_extent(py)?;
752
753 if extent.num_ranks() != 1 {
754 return Err(pyo3::exceptions::PyValueError::new_err(format!(
755 "call_one requires exactly 1 actor, but mesh has {}",
756 extent.num_ranks()
757 )));
758 }
759
760 let instance = self.get_current_instance(py)?;
761 let span_guard = self.enter_endpoint_span(EndpointAdverb::CallOne, instance.self_addr());
762 let (port_ref, receiver) = self.open_response_port(&instance);
763
764 let caller_headers = self.build_operation_context_headers(EndpointAdverb::CallOne);
765 self.send_message_with_headers(
766 py,
767 args,
768 kwargs,
769 Some(EitherPortRef::Unbounded(port_ref)),
770 sel!(*),
771 &instance,
772 caller_headers,
773 )?;
774
775 let task = value_collector(
776 receiver,
777 self.get_method_name().to_string(),
778 self.get_supervision_monitor(),
779 instance.clone_for_py(),
780 self.get_qualified_name(),
781 EndpointAdverb::CallOne,
782 span_guard,
783 )?;
784
785 wrap_in_future(py, task)
786 }
787
788 fn stream<'py>(
790 &self,
791 py: Python<'py>,
792 args: &Bound<'py, PyTuple>,
793 kwargs: Option<&Bound<'py, PyDict>>,
794 ) -> PyResult<Py<PyAny>> {
795 let extent = self.get_extent(py)?;
796 let method_name = self.get_method_name().to_string();
797
798 let instance = self.get_current_instance(py)?;
799 let (port_ref, receiver) = self.open_response_port(&instance);
800
801 let caller_headers = self.build_operation_context_headers(EndpointAdverb::Stream);
802 self.send_message_with_headers(
803 py,
804 args,
805 kwargs,
806 Some(EitherPortRef::Unbounded(port_ref)),
807 sel!(*),
808 &instance,
809 caller_headers,
810 )?;
811
812 let actor_count = extent.num_ranks();
813 let start = tokio::time::Instant::now();
814 let supervision_monitor = self.get_supervision_monitor();
815 let qualified_endpoint_name = self.get_qualified_name();
816 let future_class = make_future(py).unbind();
817
818 let attributes = hyperactor_telemetry::kv_pairs!(
819 "method" => method_name.clone()
820 );
821 ENDPOINT_STREAM_THROUGHPUT.add(1, attributes);
822
823 let stream = PyValueStream {
824 receiver: Arc::new(tokio::sync::Mutex::new(receiver)),
825 supervision_monitor,
826 instance: instance.clone_for_py(),
827 remaining: AtomicUsize::new(actor_count),
828 method_name,
829 qualified_endpoint_name,
830 start,
831 actor_count,
832 future_class,
833 };
834
835 Ok(stream.into_pyobject(py)?.unbind().into())
836 }
837
838 fn broadcast<'py>(
840 &self,
841 py: Python<'py>,
842 args: &Bound<'py, PyTuple>,
843 kwargs: Option<&Bound<'py, PyDict>>,
844 ) -> PyResult<()> {
845 let instance = self.get_current_instance(py)?;
846 let method_name = self.get_method_name();
847 let attributes = hyperactor_telemetry::kv_pairs!(
848 "method" => method_name.to_string()
849 );
850
851 match self.send_message(py, args, kwargs, None, sel!(*), &instance) {
852 Ok(()) => {
853 ENDPOINT_BROADCAST_THROUGHPUT.add(1, attributes);
854 Ok(())
855 }
856 Err(e) => {
857 ENDPOINT_BROADCAST_ERROR.add(1, attributes);
858 Err(e)
859 }
860 }
861 }
862}
863
864#[pyclass(
865 name = "ActorEndpoint",
866 module = "monarch._rust_bindings.monarch_hyperactor.endpoint"
867)]
868pub struct ActorEndpoint {
869 inner: Arc<dyn SupervisableActorMesh>,
870 shape: Shape,
871 method: MethodSpecifier,
872 mesh_name: String,
873 signature: Option<Py<PyAny>>,
874 proc_mesh: Option<Py<PyAny>>,
875 propagator: Option<Py<PyAny>>,
876}
877
878impl ActorEndpoint {
879 fn create_message<'py>(
880 &self,
881 py: Python<'py>,
882 args: &Bound<'py, PyTuple>,
883 kwargs: Option<&Bound<'py, PyDict>>,
884 port_ref: Option<EitherPortRef>,
885 ) -> PyResult<PendingMessage> {
886 let port_ref_py: Py<PyAny> = match port_ref {
887 Some(pr) => pr.clone().into_pyobject(py)?.unbind(),
888 None => py.None(),
889 };
890
891 let result = create_endpoint_message(py).call1((
892 self.method.clone(),
893 self.signature
894 .as_ref()
895 .map_or_else(|| py.None(), |s| s.clone_ref(py)),
896 args,
897 kwargs
898 .map_or_else(|| PyDict::new(py), |d| d.clone())
899 .into_any(),
900 port_ref_py,
901 self.proc_mesh
902 .as_ref()
903 .map_or_else(|| py.None(), |p| p.clone_ref(py)),
904 ))?;
905 let mut pending: PyRefMut<'_, PendingMessage> = result.extract()?;
906 pending.take()
907 }
908}
909
910impl Endpoint for ActorEndpoint {
911 fn get_extent(&self, _py: Python<'_>) -> PyResult<Extent> {
912 Ok(self.shape.extent())
913 }
914
915 fn get_method_name(&self) -> &str {
916 self.method.name()
917 }
918
919 fn send_message<'py>(
920 &self,
921 py: Python<'py>,
922 args: &Bound<'py, PyTuple>,
923 kwargs: Option<&Bound<'py, PyDict>>,
924 port_ref: Option<EitherPortRef>,
925 selection: Selection,
926 instance: &Instance<PythonActor>,
927 ) -> PyResult<()> {
928 let message = self.create_message(py, args, kwargs, port_ref)?;
929 self.inner.cast_unresolved(message, selection, instance)
930 }
931
932 fn send_message_with_headers<'py>(
933 &self,
934 py: Python<'py>,
935 args: &Bound<'py, PyTuple>,
936 kwargs: Option<&Bound<'py, PyDict>>,
937 port_ref: Option<EitherPortRef>,
938 selection: Selection,
939 instance: &Instance<PythonActor>,
940 caller_headers: hyperactor_config::Flattrs,
941 ) -> PyResult<()> {
942 let message = self.create_message(py, args, kwargs, port_ref)?;
943 self.inner
944 .cast_unresolved_with_headers(message, selection, instance, caller_headers)
945 }
946
947 fn get_supervision_monitor(&self) -> Option<Arc<dyn Supervisable>> {
948 Some(self.inner.clone())
949 }
950
951 fn get_qualified_name(&self) -> Option<String> {
952 Some(format!("{}.{}()", self.mesh_name, self.method.name()))
953 }
954
955 fn enter_endpoint_span(&self, adverb: EndpointAdverb, actor_id: &ActorAddr) -> SpanGuard {
956 let mesh = self.mesh_name.as_str();
957 let method = self.method.name();
958 SpanGuard::actor_endpoint(adverb.as_str(), actor_id, mesh, method)
959 }
960}
961
962#[pymethods]
963impl ActorEndpoint {
964 #[new]
966 #[pyo3(signature = (actor_mesh, method, shape, mesh_name, signature=None, proc_mesh=None, propagator=None))]
967 fn new(
968 actor_mesh: PythonActorMesh,
969 method: MethodSpecifier,
970 shape: PyShape,
971 mesh_name: String,
972 signature: Option<Py<PyAny>>,
973 proc_mesh: Option<Py<PyAny>>,
974 propagator: Option<Py<PyAny>>,
975 ) -> Self {
976 Self {
977 inner: actor_mesh.get_inner(),
978 shape: shape.get_inner().clone(),
979 method,
980 mesh_name,
981 signature,
982 proc_mesh,
983 propagator,
984 }
985 }
986
987 #[getter]
989 fn _name(&self) -> MethodSpecifier {
990 self.method.clone()
991 }
992
993 #[getter]
995 fn _signature(&self, py: Python<'_>) -> Py<PyAny> {
996 self.signature
997 .clone()
998 .unwrap_or_else(|| py.None().into_any())
999 }
1000
1001 #[getter]
1003 fn _actor_mesh(&self) -> PythonActorMesh {
1004 PythonActorMesh::from_impl(self.inner.clone())
1005 }
1006
1007 fn _propagate<'py>(
1010 &self,
1011 py: Python<'py>,
1012 args: &Bound<'py, PyAny>,
1013 kwargs: &Bound<'py, PyAny>,
1014 fake_args: &Bound<'py, PyAny>,
1015 fake_kwargs: &Bound<'py, PyAny>,
1016 ) -> PyResult<Py<PyAny>> {
1017 let do_propagate = py
1018 .import("monarch._src.actor.endpoint")?
1019 .getattr("_do_propagate")?;
1020 let propagator = self
1021 .propagator
1022 .as_ref()
1023 .map(|p| p.clone_ref(py).into_bound(py))
1024 .unwrap_or_else(|| py.None().into_bound(py));
1025 let cache = PyDict::new(py);
1026 do_propagate
1027 .call1((&propagator, args, kwargs, fake_args, fake_kwargs, cache))?
1028 .extract()
1029 }
1030
1031 fn _fetch_propagate<'py>(
1034 &self,
1035 py: Python<'py>,
1036 args: &Bound<'py, PyAny>,
1037 kwargs: &Bound<'py, PyAny>,
1038 fake_args: &Bound<'py, PyAny>,
1039 fake_kwargs: &Bound<'py, PyAny>,
1040 ) -> PyResult<Py<PyAny>> {
1041 if self.propagator.is_none() {
1042 return Ok(py.None());
1043 }
1044 self._propagate(py, args, kwargs, fake_args, fake_kwargs)
1045 }
1046
1047 fn _pipe_propagate<'py>(
1050 &self,
1051 py: Python<'py>,
1052 args: &Bound<'py, PyAny>,
1053 kwargs: &Bound<'py, PyAny>,
1054 fake_args: &Bound<'py, PyAny>,
1055 fake_kwargs: &Bound<'py, PyAny>,
1056 ) -> PyResult<Py<PyAny>> {
1057 let is_callable = self
1059 .propagator
1060 .as_ref()
1061 .map(|p| p.bind(py).is_callable())
1062 .unwrap_or(false);
1063 if !is_callable {
1064 return Err(pyo3::exceptions::PyValueError::new_err(
1065 "Must specify explicit callable for pipe",
1066 ));
1067 }
1068 self._propagate(py, args, kwargs, fake_args, fake_kwargs)
1069 }
1070
1071 #[pyo3(signature = (*args, **kwargs))]
1073 fn rref<'py>(
1074 slf: PyRef<'py, Self>,
1075 py: Python<'py>,
1076 args: &Bound<'py, PyTuple>,
1077 kwargs: Option<&Bound<'py, PyDict>>,
1078 ) -> PyResult<Py<PyAny>> {
1079 let kwargs_dict = kwargs.map_or_else(|| PyDict::new(py), |d| d.clone());
1080
1081 let result = dispatch_actor_rref(py).call1((slf.into_pyobject(py)?, args, kwargs_dict))?;
1083
1084 Ok(result.unbind())
1085 }
1086
1087 #[pyo3(signature = (*args, **kwargs), name = "call")]
1089 fn py_call<'py>(
1090 &self,
1091 py: Python<'py>,
1092 args: &Bound<'py, PyTuple>,
1093 kwargs: Option<&Bound<'py, PyDict>>,
1094 ) -> PyResult<Py<PyAny>> {
1095 self.call(py, args, kwargs)
1096 }
1097
1098 #[pyo3(signature = (*args, **kwargs), name = "choose")]
1100 fn py_choose<'py>(
1101 &self,
1102 py: Python<'py>,
1103 args: &Bound<'py, PyTuple>,
1104 kwargs: Option<&Bound<'py, PyDict>>,
1105 ) -> PyResult<Py<PyAny>> {
1106 self.choose(py, args, kwargs)
1107 }
1108
1109 #[pyo3(signature = (*args, **kwargs), name = "call_one")]
1111 fn py_call_one<'py>(
1112 &self,
1113 py: Python<'py>,
1114 args: &Bound<'py, PyTuple>,
1115 kwargs: Option<&Bound<'py, PyDict>>,
1116 ) -> PyResult<Py<PyAny>> {
1117 self.call_one(py, args, kwargs)
1118 }
1119
1120 #[pyo3(signature = (*args, **kwargs), name = "stream")]
1122 fn py_stream<'py>(
1123 &self,
1124 py: Python<'py>,
1125 args: &Bound<'py, PyTuple>,
1126 kwargs: Option<&Bound<'py, PyDict>>,
1127 ) -> PyResult<Py<PyAny>> {
1128 self.stream(py, args, kwargs)
1129 }
1130
1131 #[pyo3(signature = (*args, **kwargs), name = "broadcast")]
1133 fn py_broadcast<'py>(
1134 &self,
1135 py: Python<'py>,
1136 args: &Bound<'py, PyTuple>,
1137 kwargs: Option<&Bound<'py, PyDict>>,
1138 ) -> PyResult<()> {
1139 self.broadcast(py, args, kwargs)
1140 }
1141
1142 fn _send<'py>(
1144 &self,
1145 py: Python<'py>,
1146 args: &Bound<'py, PyTuple>,
1147 kwargs: &Bound<'py, PyDict>,
1148 port: Option<EitherPortRef>,
1149 selection: &str,
1150 ) -> PyResult<()> {
1151 let instance = self.get_current_instance(py)?;
1152 let sel = to_hy_sel(selection)?;
1153 self.send_message(py, args, Some(kwargs), port, sel, &instance)
1154 }
1155}
1156
1157#[pyclass(
1162 name = "Remote",
1163 module = "monarch._rust_bindings.monarch_hyperactor.endpoint"
1164)]
1165pub struct Remote {
1166 inner: Py<PyAny>,
1168}
1169
1170impl Endpoint for Remote {
1171 fn get_extent(&self, py: Python<'_>) -> PyResult<Extent> {
1172 let extent: PyExtent = self.inner.call_method0(py, "_get_extent")?.extract(py)?;
1173 Ok(extent.into())
1174 }
1175
1176 fn get_method_name(&self) -> &str {
1177 "unknown"
1178 }
1179
1180 fn send_message<'py>(
1181 &self,
1182 py: Python<'py>,
1183 args: &Bound<'py, PyTuple>,
1184 kwargs: Option<&Bound<'py, PyDict>>,
1185 port_ref: Option<EitherPortRef>,
1186 selection: Selection,
1187 _instance: &Instance<PythonActor>,
1188 ) -> PyResult<()> {
1189 let send_kwargs = PyDict::new(py);
1190 match port_ref {
1191 Some(pr) => send_kwargs.set_item("port", pr.clone())?,
1192 None => send_kwargs.set_item("port", py.None())?,
1193 }
1194
1195 let selection_str = match selection {
1196 Selection::All(inner) if matches!(*inner, Selection::True) => "all",
1197 Selection::Any(inner) if matches!(*inner, Selection::True) => "choose",
1198 _ => {
1199 panic!("only sel!(*) and sel!(?) should be provided as selection for send_message")
1200 }
1201 };
1202
1203 send_kwargs.set_item("selection", selection_str)?;
1204
1205 let kwargs_dict = kwargs.map_or_else(|| PyDict::new(py), |d| d.clone());
1206 self.inner
1207 .call_method(py, "_send", (args.clone(), kwargs_dict), Some(&send_kwargs))?;
1208
1209 Ok(())
1210 }
1211
1212 fn get_supervision_monitor(&self) -> Option<Arc<dyn Supervisable>> {
1213 None }
1215
1216 fn get_qualified_name(&self) -> Option<String> {
1217 None }
1219
1220 fn enter_endpoint_span(&self, adverb: EndpointAdverb, actor_id: &ActorAddr) -> SpanGuard {
1221 let call_name = Python::attach(|py| {
1222 self.inner
1223 .call_method0(py, "_call_name")
1224 .ok()
1225 .and_then(|v| v.extract::<String>(py).ok())
1226 });
1227 let call_name = call_name.as_deref().unwrap_or("");
1228 SpanGuard::remote(adverb.as_str(), actor_id, call_name)
1229 }
1230}
1231
1232#[pymethods]
1233impl Remote {
1234 #[new]
1236 fn new(remote: Py<PyAny>) -> Self {
1237 Self { inner: remote }
1238 }
1239
1240 #[pyo3(signature = (*args, **kwargs), name = "call")]
1242 fn py_call<'py>(
1243 &self,
1244 py: Python<'py>,
1245 args: &Bound<'py, PyTuple>,
1246 kwargs: Option<&Bound<'py, PyDict>>,
1247 ) -> PyResult<Py<PyAny>> {
1248 self.call(py, args, kwargs)
1249 }
1250
1251 #[pyo3(signature = (*args, **kwargs), name = "choose")]
1253 fn py_choose<'py>(
1254 &self,
1255 py: Python<'py>,
1256 args: &Bound<'py, PyTuple>,
1257 kwargs: Option<&Bound<'py, PyDict>>,
1258 ) -> PyResult<Py<PyAny>> {
1259 self.choose(py, args, kwargs)
1260 }
1261
1262 #[pyo3(signature = (*args, **kwargs), name = "call_one")]
1264 fn py_call_one<'py>(
1265 &self,
1266 py: Python<'py>,
1267 args: &Bound<'py, PyTuple>,
1268 kwargs: Option<&Bound<'py, PyDict>>,
1269 ) -> PyResult<Py<PyAny>> {
1270 self.call_one(py, args, kwargs)
1271 }
1272
1273 #[pyo3(signature = (*args, **kwargs), name = "stream")]
1275 fn py_stream<'py>(
1276 &self,
1277 py: Python<'py>,
1278 args: &Bound<'py, PyTuple>,
1279 kwargs: Option<&Bound<'py, PyDict>>,
1280 ) -> PyResult<Py<PyAny>> {
1281 self.stream(py, args, kwargs)
1282 }
1283
1284 #[pyo3(signature = (*args, **kwargs), name = "broadcast")]
1286 fn py_broadcast<'py>(
1287 &self,
1288 py: Python<'py>,
1289 args: &Bound<'py, PyTuple>,
1290 kwargs: Option<&Bound<'py, PyDict>>,
1291 ) -> PyResult<()> {
1292 self.broadcast(py, args, kwargs)
1293 }
1294
1295 #[pyo3(signature = (*args, **kwargs))]
1297 fn rref<'py>(
1298 &self,
1299 py: Python<'py>,
1300 args: &Bound<'py, PyTuple>,
1301 kwargs: Option<&Bound<'py, PyDict>>,
1302 ) -> PyResult<Py<PyAny>> {
1303 let kwargs_dict = kwargs.map_or_else(|| PyDict::new(py), |d| d.clone());
1304 self.inner.call_method(py, "rref", args, Some(&kwargs_dict))
1305 }
1306
1307 fn _call_name(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
1309 self.inner.call_method0(py, "_call_name")
1310 }
1311
1312 #[getter]
1314 fn _maybe_resolvable(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
1315 self.inner.getattr(py, "_maybe_resolvable")
1316 }
1317
1318 #[getter]
1320 fn _resolvable(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
1321 self.inner.getattr(py, "_resolvable")
1322 }
1323
1324 #[getter]
1327 fn _remote_impl(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
1328 self.inner.getattr(py, "_remote_impl")
1329 }
1330
1331 fn _propagate<'py>(
1334 &self,
1335 py: Python<'py>,
1336 args: &Bound<'py, PyAny>,
1337 kwargs: &Bound<'py, PyAny>,
1338 fake_args: &Bound<'py, PyAny>,
1339 fake_kwargs: &Bound<'py, PyAny>,
1340 ) -> PyResult<Py<PyAny>> {
1341 self.inner
1342 .call_method1(py, "_propagate", (args, kwargs, fake_args, fake_kwargs))
1343 }
1344
1345 fn _fetch_propagate<'py>(
1348 &self,
1349 py: Python<'py>,
1350 args: &Bound<'py, PyAny>,
1351 kwargs: &Bound<'py, PyAny>,
1352 fake_args: &Bound<'py, PyAny>,
1353 fake_kwargs: &Bound<'py, PyAny>,
1354 ) -> PyResult<Py<PyAny>> {
1355 self.inner.call_method1(
1356 py,
1357 "_fetch_propagate",
1358 (args, kwargs, fake_args, fake_kwargs),
1359 )
1360 }
1361
1362 fn _pipe_propagate<'py>(
1365 &self,
1366 py: Python<'py>,
1367 args: &Bound<'py, PyAny>,
1368 kwargs: &Bound<'py, PyAny>,
1369 fake_args: &Bound<'py, PyAny>,
1370 fake_kwargs: &Bound<'py, PyAny>,
1371 ) -> PyResult<Py<PyAny>> {
1372 self.inner.call_method1(
1373 py,
1374 "_pipe_propagate",
1375 (args, kwargs, fake_args, fake_kwargs),
1376 )
1377 }
1378
1379 fn _send<'py>(
1382 &self,
1383 py: Python<'py>,
1384 args: &Bound<'py, PyTuple>,
1385 kwargs: &Bound<'py, PyDict>,
1386 port: Option<Py<PyAny>>,
1387 selection: &str,
1388 ) -> PyResult<()> {
1389 self.inner.call_method(
1390 py,
1391 "_send",
1392 (args, kwargs),
1393 Some(&{
1394 let d = PyDict::new(py);
1395 d.set_item("port", port.unwrap_or_else(|| py.None()))?;
1396 d.set_item("selection", selection)?;
1397 d
1398 }),
1399 )?;
1400 Ok(())
1401 }
1402
1403 #[pyo3(signature = (*args, **kwargs))]
1405 fn __call__<'py>(
1406 &self,
1407 py: Python<'py>,
1408 args: &Bound<'py, PyTuple>,
1409 kwargs: Option<&Bound<'py, PyDict>>,
1410 ) -> PyResult<Py<PyAny>> {
1411 self.rref(py, args, kwargs)
1412 }
1413}
1414
1415pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> {
1416 module.add_class::<PyValueStream>()?;
1417 module.add_class::<ActorEndpoint>()?;
1418 module.add_class::<Remote>()?;
1419
1420 Ok(())
1421}
1422
1423#[derive(Named)]
1424struct PythonResponseMessageReducer;
1425
1426impl CommReducer for PythonResponseMessageReducer {
1427 type Update = PythonMessage;
1428
1429 fn reduce(&self, left: Self::Update, right: Self::Update) -> anyhow::Result<Self::Update> {
1430 Ok(ValueOverlay::try_from_runs(rle::merge_value_runs(
1431 left.into_overlay()?.into_runs(),
1432 right.into_overlay()?.into_runs(),
1433 ))?
1434 .into())
1435 }
1436}
1437
1438inventory::submit! {
1439 ReducerFactory {
1440 typehash_f: <PythonResponseMessageReducer as Named>::typehash,
1441 builder_f: |_| Ok(Box::new(PythonResponseMessageReducer)),
1442 }
1443}
1444
1445struct PythonResponseMessageAccumulator;
1446
1447impl Accumulator for PythonResponseMessageAccumulator {
1448 type State = PythonMessage;
1449 type Update = PythonMessage;
1450
1451 fn accumulate(&self, state: &mut Self::State, update: Self::Update) -> anyhow::Result<()> {
1452 *state = ValueOverlay::try_from_runs(rle::merge_value_runs(
1453 std::mem::take(state).into_overlay()?.into_runs(),
1454 update.into_overlay()?.into_runs(),
1455 ))?
1456 .into();
1457
1458 Ok(())
1459 }
1460
1461 fn reducer_spec(&self) -> Option<ReducerSpec> {
1462 Some(ReducerSpec {
1463 typehash: <PythonResponseMessageReducer as Named>::typehash(),
1464 builder_params: None,
1465 })
1466 }
1467}
1468
1469#[cfg(test)]
1470mod tests {
1471 use hyperactor::ActorAddr;
1472 use hyperactor::mailbox::headers::OPERATION_ADVERB;
1473 use hyperactor::mailbox::headers::OPERATION_ENDPOINT;
1474
1475 use super::*;
1476
1477 struct TestEndpoint {
1481 qualified_name: Option<String>,
1482 }
1483
1484 impl Endpoint for TestEndpoint {
1485 fn get_extent(&self, _py: Python<'_>) -> PyResult<Extent> {
1486 unreachable!()
1487 }
1488 fn get_method_name(&self) -> &str {
1489 unreachable!()
1490 }
1491 fn send_message<'py>(
1492 &self,
1493 _py: Python<'py>,
1494 _args: &Bound<'py, PyTuple>,
1495 _kwargs: Option<&Bound<'py, PyDict>>,
1496 _port_ref: Option<EitherPortRef>,
1497 _selection: Selection,
1498 _instance: &Instance<PythonActor>,
1499 ) -> PyResult<()> {
1500 unreachable!()
1501 }
1502 fn get_supervision_monitor(&self) -> Option<Arc<dyn Supervisable>> {
1503 None
1504 }
1505 fn get_qualified_name(&self) -> Option<String> {
1506 self.qualified_name.clone()
1507 }
1508 fn enter_endpoint_span(&self, _adverb: EndpointAdverb, _actor_id: &ActorAddr) -> SpanGuard {
1509 unreachable!()
1510 }
1511 }
1512
1513 #[test]
1517 fn test_rc1_build_operation_context_headers_stamps_each_adverb() {
1518 let ep = TestEndpoint {
1519 qualified_name: Some("training.Philosopher.ping()".to_string()),
1520 };
1521 for (adverb, expected_adverb) in [
1522 (EndpointAdverb::Call, "call"),
1523 (EndpointAdverb::CallOne, "call_one"),
1524 (EndpointAdverb::Choose, "choose"),
1525 (EndpointAdverb::Stream, "stream"),
1526 ] {
1527 let headers = ep.build_operation_context_headers(adverb);
1528 assert_eq!(
1529 headers.get(OPERATION_ENDPOINT).as_deref(),
1530 Some("training.Philosopher.ping()"),
1531 "adverb {:?}: OPERATION_ENDPOINT",
1532 adverb,
1533 );
1534 assert_eq!(
1535 headers.get(OPERATION_ADVERB).as_deref(),
1536 Some(expected_adverb),
1537 "adverb {:?}: OPERATION_ADVERB",
1538 adverb,
1539 );
1540 }
1541 }
1542
1543 #[test]
1548 fn test_rc1_build_operation_context_headers_omits_endpoint_when_no_qualified_name() {
1549 let ep = TestEndpoint {
1550 qualified_name: None,
1551 };
1552 let headers = ep.build_operation_context_headers(EndpointAdverb::CallOne);
1553 assert!(
1554 headers.get(OPERATION_ENDPOINT).is_none(),
1555 "OPERATION_ENDPOINT must be absent when endpoint has no qualified name",
1556 );
1557 assert_eq!(
1558 headers.get(OPERATION_ADVERB).as_deref(),
1559 Some("call_one"),
1560 "OPERATION_ADVERB should still be stamped",
1561 );
1562 }
1563}