hyperactor/
context.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! This module defines traits that are used as context arguments to various
10//! hyperactor APIs; usually [`crate::context::Actor`], implemented by
11//! [`crate::proc::Context`] (provided to actor handlers) and [`crate::proc::Instance`],
12//! representing a running actor instance.
13//!
14//! Context traits are sealed, and thus can only be implemented by data types in the
15//! core hyperactor crate.
16
17use std::mem::take;
18use std::sync::Arc;
19use std::sync::Mutex;
20use std::sync::OnceLock;
21
22use async_trait::async_trait;
23use backoff::ExponentialBackoffBuilder;
24use backoff::backoff::Backoff;
25use dashmap::DashSet;
26use hyperactor_config::attrs::Attrs;
27
28use crate::ActorId;
29use crate::Instance;
30use crate::PortId;
31use crate::accum;
32use crate::accum::ErasedCommReducer;
33use crate::accum::ReducerOpts;
34use crate::accum::ReducerSpec;
35use crate::config;
36use crate::mailbox;
37use crate::mailbox::MailboxSender;
38use crate::mailbox::MessageEnvelope;
39use crate::time::Alarm;
40
41/// A mailbox context provides a mailbox.
42pub trait Mailbox: crate::private::Sealed + Send + Sync {
43    /// The mailbox associated with this context
44    fn mailbox(&self) -> &crate::Mailbox;
45}
46
47/// A typed actor context, providing both a [`Mailbox`] and an [`Instance`].
48///
49/// Note: Send and Sync markers are here only temporarily in order to bridge
50/// the transition to the context types, away from the [`crate::cap`] module.
51#[async_trait]
52pub trait Actor: Mailbox {
53    /// The type of actor associated with this context.
54    type A: crate::Actor;
55
56    /// The instance associated with this context.
57    fn instance(&self) -> &Instance<Self::A>;
58}
59
60/// An internal extension trait for Mailbox contexts.
61/// TODO: consider moving this to another module.
62pub(crate) trait MailboxExt: Mailbox {
63    /// Post a message to the provided destination with the provided headers, and data.
64    /// All messages posted from actors should use this implementation.
65    fn post(&self, dest: PortId, headers: Attrs, data: wirevalue::Any, return_undeliverable: bool);
66
67    /// Split a port, using a provided reducer spec, if provided.
68    fn split(
69        &self,
70        port_id: PortId,
71        reducer_spec: Option<ReducerSpec>,
72        reducer_opts: Option<ReducerOpts>,
73        return_undeliverable: bool,
74    ) -> anyhow::Result<PortId>;
75}
76
77// Tracks mailboxes that have emitted a `CanSend::post` warning due to
78// missing an `Undeliverable<MessageEnvelope>` binding. In this
79// context, mailboxes are few and long-lived; unbounded growth is not
80// a realistic concern.
81static CAN_SEND_WARNED_MAILBOXES: OnceLock<DashSet<ActorId>> = OnceLock::new();
82
83/// Only actors CanSend because they need a return port.
84impl<T: Actor + Send + Sync> MailboxExt for T {
85    fn post(&self, dest: PortId, headers: Attrs, data: wirevalue::Any, return_undeliverable: bool) {
86        let return_handle = self.mailbox().bound_return_handle().unwrap_or_else(|| {
87            let actor_id = self.mailbox().actor_id();
88            if CAN_SEND_WARNED_MAILBOXES
89                .get_or_init(DashSet::new)
90                .insert(actor_id.clone())
91            {
92                let bt = std::backtrace::Backtrace::force_capture();
93                tracing::warn!(
94                    actor_id = ?actor_id,
95                    backtrace = ?bt,
96                    "mailbox attempted to post a message without binding Undeliverable<MessageEnvelope>"
97                );
98            }
99            mailbox::monitored_return_handle()
100        });
101
102        let mut envelope =
103            MessageEnvelope::new(self.mailbox().actor_id().clone(), dest, data, headers);
104        envelope.set_return_undeliverable(return_undeliverable);
105        MailboxSender::post(self.mailbox(), envelope, return_handle);
106    }
107
108    fn split(
109        &self,
110        port_id: PortId,
111        reducer_spec: Option<ReducerSpec>,
112        reducer_opts: Option<ReducerOpts>,
113        return_undeliverable: bool,
114    ) -> anyhow::Result<PortId> {
115        fn post(
116            mailbox: &mailbox::Mailbox,
117            port_id: PortId,
118            msg: wirevalue::Any,
119            return_undeliverable: bool,
120        ) {
121            let mut envelope =
122                MessageEnvelope::new(mailbox.actor_id().clone(), port_id, msg, Attrs::new());
123            envelope.set_return_undeliverable(return_undeliverable);
124            mailbox::MailboxSender::post(
125                mailbox,
126                envelope,
127                // TODO(pzhang) figure out how to use upstream's return handle,
128                // instead of getting a new one like this.
129                // This is okay for now because upstream is currently also using
130                // the same handle singleton, but that could change in the future.
131                mailbox::monitored_return_handle(),
132            );
133        }
134
135        let port_index = self.mailbox().allocate_port();
136        let split_port = self.mailbox().actor_id().port_id(port_index);
137        let mailbox = self.mailbox().clone();
138        let reducer = reducer_spec
139            .map(
140                |ReducerSpec {
141                     typehash,
142                     builder_params,
143                 }| { accum::resolve_reducer(typehash, builder_params) },
144            )
145            .transpose()?
146            .flatten();
147        let enqueue: Box<
148            dyn Fn(wirevalue::Any) -> Result<(), (wirevalue::Any, anyhow::Error)> + Send + Sync,
149        > = match reducer {
150            None => Box::new(move |serialized: wirevalue::Any| {
151                post(&mailbox, port_id.clone(), serialized, return_undeliverable);
152                Ok(())
153            }),
154            Some(reducer) => {
155                let buffer: Arc<Mutex<UpdateBuffer>> =
156                    Arc::new(Mutex::new(UpdateBuffer::new(reducer)));
157
158                let alarm = Alarm::new();
159
160                {
161                    let mut sleeper = alarm.sleeper();
162                    let buffer = Arc::clone(&buffer);
163                    let port_id = port_id.clone();
164                    let mailbox = mailbox.clone();
165                    tokio::spawn(async move {
166                        while sleeper.sleep().await {
167                            let mut buf = buffer.lock().unwrap();
168                            match buf.reduce() {
169                                None => (),
170                                Some(Ok(reduced)) => {
171                                    post(&mailbox, port_id.clone(), reduced, return_undeliverable)
172                                }
173                                // We simply ignore errors here, and let them be propagated
174                                // later in the enqueueing function.
175                                //
176                                // If this is the last update, then this strategy will cause a hang.
177                                // We should obtain a supervisor here from our send context and notify
178                                // it.
179                                Some(Err(e)) => tracing::error!(
180                                    "error while reducing update: {}; waiting until the next send to propagate",
181                                    e
182                                ),
183                            }
184                        }
185                    });
186                }
187
188                // Note: alarm is held in the closure while the port is active;
189                // when it is dropped, the alarm terminates, and so does the sleeper
190                // task.
191                let alarm = Mutex::new(alarm);
192
193                // Default to global configuration if not specified.
194                let reducer_opts = reducer_opts.unwrap_or_default();
195                let max_interval = reducer_opts.max_update_interval();
196                let initial_interval = reducer_opts.initial_update_interval();
197
198                // Create exponential backoff for buffer flush interval, starting at
199                // initial_interval and growing to max_interval
200                let backoff = Mutex::new(
201                    ExponentialBackoffBuilder::new()
202                        .with_initial_interval(initial_interval)
203                        .with_multiplier(2.0)
204                        .with_max_interval(max_interval)
205                        .with_max_elapsed_time(None)
206                        .build(),
207                );
208
209                Box::new(move |update: wirevalue::Any| {
210                    // Hold the lock until messages are sent. This is to avoid another
211                    // invocation of this method trying to send message concurrently and
212                    // cause messages delivered out of order.
213                    //
214                    // We also always acquire alarm *after* the buffer, to avoid deadlocks.
215                    let mut buf = buffer.lock().unwrap();
216                    match buf.push(update) {
217                        None => {
218                            let interval = backoff.lock().unwrap().next_backoff().unwrap();
219                            alarm.lock().unwrap().rearm(interval);
220                            Ok(())
221                        }
222                        Some(Ok(reduced)) => {
223                            alarm.lock().unwrap().disarm();
224                            post(&mailbox, port_id.clone(), reduced, return_undeliverable);
225                            Ok(())
226                        }
227                        Some(Err(e)) => Err((buf.pop().unwrap(), e)),
228                    }
229                })
230            }
231        };
232        self.mailbox().bind_untyped(
233            &split_port,
234            mailbox::UntypedUnboundedSender {
235                sender: enqueue,
236                port_id: split_port.clone(),
237            },
238        );
239        Ok(split_port)
240    }
241}
242
243struct UpdateBuffer {
244    buffered: Vec<wirevalue::Any>,
245    reducer: Box<dyn ErasedCommReducer + Send + Sync + 'static>,
246}
247
248impl UpdateBuffer {
249    fn new(reducer: Box<dyn ErasedCommReducer + Send + Sync + 'static>) -> Self {
250        Self {
251            buffered: Vec::new(),
252            reducer,
253        }
254    }
255
256    fn is_empty(&self) -> bool {
257        self.buffered.is_empty()
258    }
259
260    fn pop(&mut self) -> Option<wirevalue::Any> {
261        self.buffered.pop()
262    }
263
264    /// Push a new item to the buffer, and optionally return any items that should
265    /// be flushed.
266    fn push(&mut self, serialized: wirevalue::Any) -> Option<anyhow::Result<wirevalue::Any>> {
267        let limit = hyperactor_config::global::get(config::SPLIT_MAX_BUFFER_SIZE);
268
269        self.buffered.push(serialized);
270        if self.buffered.len() >= limit {
271            self.reduce()
272        } else {
273            None
274        }
275    }
276
277    fn reduce(&mut self) -> Option<anyhow::Result<wirevalue::Any>> {
278        if self.buffered.is_empty() {
279            None
280        } else {
281            match self.reducer.reduce_updates(take(&mut self.buffered)) {
282                Ok(reduced) => Some(Ok(reduced)),
283                Err((e, b)) => {
284                    self.buffered = b;
285                    Some(Err(e))
286                }
287            }
288        }
289    }
290}