hyperactor/
context.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! This module defines traits that are used as context arguments to various
10//! hyperactor APIs; usually [`crate::context::Actor`], implemented by
11//! [`crate::proc::Context`] (provided to actor handlers) and [`crate::proc::Instance`],
12//! representing a running actor instance.
13//!
14//! Context traits are sealed, and thus can only be implemented by data types in the
15//! core hyperactor crate.
16
17use std::mem::take;
18use std::sync::Arc;
19use std::sync::Mutex;
20use std::sync::OnceLock;
21
22use async_trait::async_trait;
23use dashmap::DashSet;
24
25use crate::ActorId;
26use crate::Instance;
27use crate::PortId;
28use crate::accum;
29use crate::accum::ErasedCommReducer;
30use crate::accum::ReducerOpts;
31use crate::accum::ReducerSpec;
32use crate::attrs::Attrs;
33use crate::config;
34use crate::data::Serialized;
35use crate::mailbox;
36use crate::mailbox::MailboxSender;
37use crate::mailbox::MessageEnvelope;
38use crate::time::Alarm;
39
40/// A mailbox context provides a mailbox.
41pub trait Mailbox: crate::private::Sealed + Send + Sync {
42    /// The mailbox associated with this context
43    fn mailbox(&self) -> &crate::Mailbox;
44}
45
46/// A typed actor context, providing both a [`Mailbox`] and an [`Instance`].
47///
48/// Note: Send and Sync markers are here only temporarily in order to bridge
49/// the transition to the context types, away from the [`crate::cap`] module.
50#[async_trait]
51pub trait Actor: Mailbox {
52    /// The type of actor associated with this context.
53    type A: crate::Actor;
54
55    /// The instance associated with this context.
56    fn instance(&self) -> &Instance<Self::A>;
57}
58
59/// An internal extension trait for Mailbox contexts.
60/// TODO: consider moving this to another module.
61pub(crate) trait MailboxExt: Mailbox {
62    /// Post a message to the provided destination with the provided headers, and data.
63    /// All messages posted from actors should use this implementation.
64    fn post(&self, dest: PortId, headers: Attrs, data: Serialized);
65
66    /// Split a port, using a provided reducer spec, if provided.
67    fn split(
68        &self,
69        port_id: PortId,
70        reducer_spec: Option<ReducerSpec>,
71        reducer_opts: Option<ReducerOpts>,
72    ) -> anyhow::Result<PortId>;
73}
74
75// Tracks mailboxes that have emitted a `CanSend::post` warning due to
76// missing an `Undeliverable<MessageEnvelope>` binding. In this
77// context, mailboxes are few and long-lived; unbounded growth is not
78// a realistic concern.
79static CAN_SEND_WARNED_MAILBOXES: OnceLock<DashSet<ActorId>> = OnceLock::new();
80
81/// Only actors CanSend because they need a return port.
82impl<T: Mailbox + Send + Sync> MailboxExt for T {
83    fn post(&self, dest: PortId, headers: Attrs, data: Serialized) {
84        let return_handle = self.mailbox().bound_return_handle().unwrap_or_else(|| {
85            let actor_id = self.mailbox().actor_id();
86            if CAN_SEND_WARNED_MAILBOXES
87                .get_or_init(DashSet::new)
88                .insert(actor_id.clone())
89            {
90                let bt = std::backtrace::Backtrace::force_capture();
91                tracing::warn!(
92                    actor_id = ?actor_id,
93                    backtrace = ?bt,
94                    "mailbox attempted to post a message without binding Undeliverable<MessageEnvelope>"
95                );
96            }
97            mailbox::monitored_return_handle()
98        });
99
100        let envelope = MessageEnvelope::new(self.mailbox().actor_id().clone(), dest, data, headers);
101        MailboxSender::post(self.mailbox(), envelope, return_handle);
102    }
103
104    fn split(
105        &self,
106        port_id: PortId,
107        reducer_spec: Option<ReducerSpec>,
108        reducer_opts: Option<ReducerOpts>,
109    ) -> anyhow::Result<PortId> {
110        fn post(mailbox: &mailbox::Mailbox, port_id: PortId, msg: Serialized) {
111            mailbox::MailboxSender::post(
112                mailbox,
113                MessageEnvelope::new(mailbox.actor_id().clone(), port_id, msg, Attrs::new()),
114                // TODO(pzhang) figure out how to use upstream's return handle,
115                // instead of getting a new one like this.
116                // This is okay for now because upstream is currently also using
117                // the same handle singleton, but that could change in the future.
118                mailbox::monitored_return_handle(),
119            );
120        }
121
122        let port_index = self.mailbox().allocate_port();
123        let split_port = self.mailbox().actor_id().port_id(port_index);
124        let mailbox = self.mailbox().clone();
125        let reducer = reducer_spec
126            .map(
127                |ReducerSpec {
128                     typehash,
129                     builder_params,
130                 }| { accum::resolve_reducer(typehash, builder_params) },
131            )
132            .transpose()?
133            .flatten();
134        let enqueue: Box<
135            dyn Fn(Serialized) -> Result<(), (Serialized, anyhow::Error)> + Send + Sync,
136        > = match reducer {
137            None => Box::new(move |serialized: Serialized| {
138                post(&mailbox, port_id.clone(), serialized);
139                Ok(())
140            }),
141            Some(reducer) => {
142                let buffer: Arc<Mutex<UpdateBuffer>> =
143                    Arc::new(Mutex::new(UpdateBuffer::new(reducer)));
144
145                let alarm = Alarm::new();
146
147                {
148                    let mut sleeper = alarm.sleeper();
149                    let buffer = Arc::clone(&buffer);
150                    let port_id = port_id.clone();
151                    let mailbox = mailbox.clone();
152                    tokio::spawn(async move {
153                        while sleeper.sleep().await {
154                            let mut buf = buffer.lock().unwrap();
155                            match buf.reduce() {
156                                None => (),
157                                Some(Ok(reduced)) => post(&mailbox, port_id.clone(), reduced),
158                                // We simply ignore errors here, and let them be propagated
159                                // later in the enqueueing function.
160                                //
161                                // If this is the last update, then this strategy will cause a hang.
162                                // We should obtain a supervisor here from our send context and notify
163                                // it.
164                                Some(Err(e)) => tracing::error!(
165                                    "error while reducing update: {}; waiting until the next send to propagate",
166                                    e
167                                ),
168                            }
169                        }
170                    });
171                }
172
173                // Note: alarm is held in the closure while the port is active;
174                // when it is dropped, the alarm terminates, and so does the sleeper
175                // task.
176                let alarm = Mutex::new(alarm);
177
178                // Default to global configuration if not specified.
179                let reducer_opts = reducer_opts.unwrap_or_default();
180
181                Box::new(move |update: Serialized| {
182                    // Hold the lock until messages are sent. This is to avoid another
183                    // invocation of this method trying to send message concurrently and
184                    // cause messages delivered out of order.
185                    //
186                    // We also always acquire alarm *after* the buffer, to avoid deadlocks.
187                    let mut buf = buffer.lock().unwrap();
188                    let was_empty = buf.is_empty();
189                    match buf.push(update) {
190                        None if was_empty => {
191                            alarm
192                                .lock()
193                                .unwrap()
194                                .arm(reducer_opts.max_update_interval());
195                            Ok(())
196                        }
197                        None => Ok(()),
198                        Some(Ok(reduced)) => {
199                            alarm.lock().unwrap().disarm();
200                            post(&mailbox, port_id.clone(), reduced);
201                            Ok(())
202                        }
203                        Some(Err(e)) => Err((buf.pop().unwrap(), e)),
204                    }
205                })
206            }
207        };
208        self.mailbox().bind_untyped(
209            &split_port,
210            mailbox::UntypedUnboundedSender {
211                sender: enqueue,
212                port_id: split_port.clone(),
213            },
214        );
215        Ok(split_port)
216    }
217}
218
219struct UpdateBuffer {
220    buffered: Vec<Serialized>,
221    reducer: Box<dyn ErasedCommReducer + Send + Sync + 'static>,
222}
223
224impl UpdateBuffer {
225    fn new(reducer: Box<dyn ErasedCommReducer + Send + Sync + 'static>) -> Self {
226        Self {
227            buffered: Vec::new(),
228            reducer,
229        }
230    }
231
232    fn is_empty(&self) -> bool {
233        self.buffered.is_empty()
234    }
235
236    fn pop(&mut self) -> Option<Serialized> {
237        self.buffered.pop()
238    }
239
240    /// Push a new item to the buffer, and optionally return any items that should
241    /// be flushed.
242    fn push(&mut self, serialized: Serialized) -> Option<anyhow::Result<Serialized>> {
243        let limit = config::global::get(config::SPLIT_MAX_BUFFER_SIZE);
244
245        self.buffered.push(serialized);
246        if self.buffered.len() >= limit {
247            self.reduce()
248        } else {
249            None
250        }
251    }
252
253    fn reduce(&mut self) -> Option<anyhow::Result<Serialized>> {
254        if self.buffered.is_empty() {
255            None
256        } else {
257            match self.reducer.reduce_updates(take(&mut self.buffered)) {
258                Ok(reduced) => Some(Ok(reduced)),
259                Err((e, b)) => {
260                    self.buffered = b;
261                    Some(Err(e))
262                }
263            }
264        }
265    }
266}