hyperactor/
context.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! This module defines traits that are used as context arguments to various
10//! hyperactor APIs; usually [`crate::context::Actor`], implemented by
11//! [`crate::proc::Context`] (provided to actor handlers) and [`crate::proc::Instance`],
12//! representing a running actor instance.
13//!
14//! Context traits are sealed, and thus can only be implemented by data types in the
15//! core hyperactor crate.
16
17use std::mem::take;
18use std::sync::Arc;
19use std::sync::Mutex;
20use std::sync::OnceLock;
21
22use async_trait::async_trait;
23use dashmap::DashSet;
24
25use crate::ActorId;
26use crate::Instance;
27use crate::PortId;
28use crate::accum;
29use crate::accum::ErasedCommReducer;
30use crate::accum::ReducerOpts;
31use crate::accum::ReducerSpec;
32use crate::attrs::Attrs;
33use crate::config;
34use crate::data::Serialized;
35use crate::mailbox;
36use crate::mailbox::MailboxSender;
37use crate::mailbox::MessageEnvelope;
38use crate::time::Alarm;
39
40/// A mailbox context provides a mailbox.
41pub trait Mailbox: crate::private::Sealed + Send + Sync {
42    /// The mailbox associated with this context
43    fn mailbox(&self) -> &crate::Mailbox;
44}
45
46/// A typed actor context, providing both a [`Mailbox`] and an [`Instance`].
47///
48/// Note: Send and Sync markers are here only temporarily in order to bridge
49/// the transition to the context types, away from the [`crate::cap`] module.
50#[async_trait]
51pub trait Actor: Mailbox {
52    /// The type of actor associated with this context.
53    type A: crate::Actor;
54
55    /// The instance associated with this context.
56    fn instance(&self) -> &Instance<Self::A>;
57}
58
59/// An internal extension trait for Mailbox contexts.
60/// TODO: consider moving this to another module.
61pub(crate) trait MailboxExt: Mailbox {
62    /// Post a message to the provided destination with the provided headers, and data.
63    /// All messages posted from actors should use this implementation.
64    fn post(&self, dest: PortId, headers: Attrs, data: Serialized, return_undeliverable: bool);
65
66    /// Split a port, using a provided reducer spec, if provided.
67    fn split(
68        &self,
69        port_id: PortId,
70        reducer_spec: Option<ReducerSpec>,
71        reducer_opts: Option<ReducerOpts>,
72        return_undeliverable: bool,
73    ) -> anyhow::Result<PortId>;
74}
75
76// Tracks mailboxes that have emitted a `CanSend::post` warning due to
77// missing an `Undeliverable<MessageEnvelope>` binding. In this
78// context, mailboxes are few and long-lived; unbounded growth is not
79// a realistic concern.
80static CAN_SEND_WARNED_MAILBOXES: OnceLock<DashSet<ActorId>> = OnceLock::new();
81
82/// Only actors CanSend because they need a return port.
83impl<T: Actor + Send + Sync> MailboxExt for T {
84    fn post(&self, dest: PortId, headers: Attrs, data: Serialized, return_undeliverable: bool) {
85        let return_handle = self.mailbox().bound_return_handle().unwrap_or_else(|| {
86            let actor_id = self.mailbox().actor_id();
87            if CAN_SEND_WARNED_MAILBOXES
88                .get_or_init(DashSet::new)
89                .insert(actor_id.clone())
90            {
91                let bt = std::backtrace::Backtrace::force_capture();
92                tracing::warn!(
93                    actor_id = ?actor_id,
94                    backtrace = ?bt,
95                    "mailbox attempted to post a message without binding Undeliverable<MessageEnvelope>"
96                );
97            }
98            mailbox::monitored_return_handle()
99        });
100
101        let mut envelope =
102            MessageEnvelope::new(self.mailbox().actor_id().clone(), dest, data, headers);
103        envelope.set_return_undeliverable(return_undeliverable);
104        MailboxSender::post(self.mailbox(), envelope, return_handle);
105    }
106
107    fn split(
108        &self,
109        port_id: PortId,
110        reducer_spec: Option<ReducerSpec>,
111        reducer_opts: Option<ReducerOpts>,
112        return_undeliverable: bool,
113    ) -> anyhow::Result<PortId> {
114        fn post(
115            mailbox: &mailbox::Mailbox,
116            port_id: PortId,
117            msg: Serialized,
118            return_undeliverable: bool,
119        ) {
120            let mut envelope =
121                MessageEnvelope::new(mailbox.actor_id().clone(), port_id, msg, Attrs::new());
122            envelope.set_return_undeliverable(return_undeliverable);
123            mailbox::MailboxSender::post(
124                mailbox,
125                envelope,
126                // TODO(pzhang) figure out how to use upstream's return handle,
127                // instead of getting a new one like this.
128                // This is okay for now because upstream is currently also using
129                // the same handle singleton, but that could change in the future.
130                mailbox::monitored_return_handle(),
131            );
132        }
133
134        let port_index = self.mailbox().allocate_port();
135        let split_port = self.mailbox().actor_id().port_id(port_index);
136        let mailbox = self.mailbox().clone();
137        let reducer = reducer_spec
138            .map(
139                |ReducerSpec {
140                     typehash,
141                     builder_params,
142                 }| { accum::resolve_reducer(typehash, builder_params) },
143            )
144            .transpose()?
145            .flatten();
146        let enqueue: Box<
147            dyn Fn(Serialized) -> Result<(), (Serialized, anyhow::Error)> + Send + Sync,
148        > = match reducer {
149            None => Box::new(move |serialized: Serialized| {
150                post(&mailbox, port_id.clone(), serialized, return_undeliverable);
151                Ok(())
152            }),
153            Some(reducer) => {
154                let buffer: Arc<Mutex<UpdateBuffer>> =
155                    Arc::new(Mutex::new(UpdateBuffer::new(reducer)));
156
157                let alarm = Alarm::new();
158
159                {
160                    let mut sleeper = alarm.sleeper();
161                    let buffer = Arc::clone(&buffer);
162                    let port_id = port_id.clone();
163                    let mailbox = mailbox.clone();
164                    tokio::spawn(async move {
165                        while sleeper.sleep().await {
166                            let mut buf = buffer.lock().unwrap();
167                            match buf.reduce() {
168                                None => (),
169                                Some(Ok(reduced)) => {
170                                    post(&mailbox, port_id.clone(), reduced, return_undeliverable)
171                                }
172                                // We simply ignore errors here, and let them be propagated
173                                // later in the enqueueing function.
174                                //
175                                // If this is the last update, then this strategy will cause a hang.
176                                // We should obtain a supervisor here from our send context and notify
177                                // it.
178                                Some(Err(e)) => tracing::error!(
179                                    "error while reducing update: {}; waiting until the next send to propagate",
180                                    e
181                                ),
182                            }
183                        }
184                    });
185                }
186
187                // Note: alarm is held in the closure while the port is active;
188                // when it is dropped, the alarm terminates, and so does the sleeper
189                // task.
190                let alarm = Mutex::new(alarm);
191
192                // Default to global configuration if not specified.
193                let reducer_opts = reducer_opts.unwrap_or_default();
194
195                Box::new(move |update: Serialized| {
196                    // Hold the lock until messages are sent. This is to avoid another
197                    // invocation of this method trying to send message concurrently and
198                    // cause messages delivered out of order.
199                    //
200                    // We also always acquire alarm *after* the buffer, to avoid deadlocks.
201                    let mut buf = buffer.lock().unwrap();
202                    let was_empty = buf.is_empty();
203                    match buf.push(update) {
204                        None if was_empty => {
205                            alarm
206                                .lock()
207                                .unwrap()
208                                .arm(reducer_opts.max_update_interval());
209                            Ok(())
210                        }
211                        None => Ok(()),
212                        Some(Ok(reduced)) => {
213                            alarm.lock().unwrap().disarm();
214                            post(&mailbox, port_id.clone(), reduced, return_undeliverable);
215                            Ok(())
216                        }
217                        Some(Err(e)) => Err((buf.pop().unwrap(), e)),
218                    }
219                })
220            }
221        };
222        self.mailbox().bind_untyped(
223            &split_port,
224            mailbox::UntypedUnboundedSender {
225                sender: enqueue,
226                port_id: split_port.clone(),
227            },
228        );
229        Ok(split_port)
230    }
231}
232
233struct UpdateBuffer {
234    buffered: Vec<Serialized>,
235    reducer: Box<dyn ErasedCommReducer + Send + Sync + 'static>,
236}
237
238impl UpdateBuffer {
239    fn new(reducer: Box<dyn ErasedCommReducer + Send + Sync + 'static>) -> Self {
240        Self {
241            buffered: Vec::new(),
242            reducer,
243        }
244    }
245
246    fn is_empty(&self) -> bool {
247        self.buffered.is_empty()
248    }
249
250    fn pop(&mut self) -> Option<Serialized> {
251        self.buffered.pop()
252    }
253
254    /// Push a new item to the buffer, and optionally return any items that should
255    /// be flushed.
256    fn push(&mut self, serialized: Serialized) -> Option<anyhow::Result<Serialized>> {
257        let limit = config::global::get(config::SPLIT_MAX_BUFFER_SIZE);
258
259        self.buffered.push(serialized);
260        if self.buffered.len() >= limit {
261            self.reduce()
262        } else {
263            None
264        }
265    }
266
267    fn reduce(&mut self) -> Option<anyhow::Result<Serialized>> {
268        if self.buffered.is_empty() {
269            None
270        } else {
271            match self.reducer.reduce_updates(take(&mut self.buffered)) {
272                Ok(reduced) => Some(Ok(reduced)),
273                Err((e, b)) => {
274                    self.buffered = b;
275                    Some(Err(e))
276                }
277            }
278        }
279    }
280}