1use std::mem::take;
18use std::sync::Arc;
19use std::sync::Mutex;
20use std::sync::OnceLock;
21
22use async_trait::async_trait;
23use backoff::ExponentialBackoffBuilder;
24use backoff::backoff::Backoff;
25use dashmap::DashSet;
26use hyperactor_config::attrs::Attrs;
27
28use crate::ActorId;
29use crate::Instance;
30use crate::PortId;
31use crate::accum;
32use crate::accum::ErasedCommReducer;
33use crate::accum::ReducerOpts;
34use crate::accum::ReducerSpec;
35use crate::config;
36use crate::mailbox;
37use crate::mailbox::MailboxSender;
38use crate::mailbox::MessageEnvelope;
39use crate::time::Alarm;
40
41pub trait Mailbox: crate::private::Sealed + Send + Sync {
43 fn mailbox(&self) -> &crate::Mailbox;
45}
46
47#[async_trait]
52pub trait Actor: Mailbox {
53 type A: crate::Actor;
55
56 fn instance(&self) -> &Instance<Self::A>;
58}
59
60pub(crate) trait MailboxExt: Mailbox {
63 fn post(&self, dest: PortId, headers: Attrs, data: wirevalue::Any, return_undeliverable: bool);
66
67 fn split(
69 &self,
70 port_id: PortId,
71 reducer_spec: Option<ReducerSpec>,
72 reducer_opts: Option<ReducerOpts>,
73 return_undeliverable: bool,
74 ) -> anyhow::Result<PortId>;
75}
76
77static CAN_SEND_WARNED_MAILBOXES: OnceLock<DashSet<ActorId>> = OnceLock::new();
82
83impl<T: Actor + Send + Sync> MailboxExt for T {
85 fn post(&self, dest: PortId, headers: Attrs, data: wirevalue::Any, return_undeliverable: bool) {
86 let return_handle = self.mailbox().bound_return_handle().unwrap_or_else(|| {
87 let actor_id = self.mailbox().actor_id();
88 if CAN_SEND_WARNED_MAILBOXES
89 .get_or_init(DashSet::new)
90 .insert(actor_id.clone())
91 {
92 let bt = std::backtrace::Backtrace::force_capture();
93 tracing::warn!(
94 actor_id = ?actor_id,
95 backtrace = ?bt,
96 "mailbox attempted to post a message without binding Undeliverable<MessageEnvelope>"
97 );
98 }
99 mailbox::monitored_return_handle()
100 });
101
102 let mut envelope =
103 MessageEnvelope::new(self.mailbox().actor_id().clone(), dest, data, headers);
104 envelope.set_return_undeliverable(return_undeliverable);
105 MailboxSender::post(self.mailbox(), envelope, return_handle);
106 }
107
108 fn split(
109 &self,
110 port_id: PortId,
111 reducer_spec: Option<ReducerSpec>,
112 reducer_opts: Option<ReducerOpts>,
113 return_undeliverable: bool,
114 ) -> anyhow::Result<PortId> {
115 fn post(
116 mailbox: &mailbox::Mailbox,
117 port_id: PortId,
118 msg: wirevalue::Any,
119 return_undeliverable: bool,
120 ) {
121 let mut envelope =
122 MessageEnvelope::new(mailbox.actor_id().clone(), port_id, msg, Attrs::new());
123 envelope.set_return_undeliverable(return_undeliverable);
124 mailbox::MailboxSender::post(
125 mailbox,
126 envelope,
127 mailbox::monitored_return_handle(),
132 );
133 }
134
135 let port_index = self.mailbox().allocate_port();
136 let split_port = self.mailbox().actor_id().port_id(port_index);
137 let mailbox = self.mailbox().clone();
138 let reducer = reducer_spec
139 .map(
140 |ReducerSpec {
141 typehash,
142 builder_params,
143 }| { accum::resolve_reducer(typehash, builder_params) },
144 )
145 .transpose()?
146 .flatten();
147 let enqueue: Box<
148 dyn Fn(wirevalue::Any) -> Result<(), (wirevalue::Any, anyhow::Error)> + Send + Sync,
149 > = match reducer {
150 None => Box::new(move |serialized: wirevalue::Any| {
151 post(&mailbox, port_id.clone(), serialized, return_undeliverable);
152 Ok(())
153 }),
154 Some(reducer) => {
155 let buffer: Arc<Mutex<UpdateBuffer>> =
156 Arc::new(Mutex::new(UpdateBuffer::new(reducer)));
157
158 let alarm = Alarm::new();
159
160 {
161 let mut sleeper = alarm.sleeper();
162 let buffer = Arc::clone(&buffer);
163 let port_id = port_id.clone();
164 let mailbox = mailbox.clone();
165 tokio::spawn(async move {
166 while sleeper.sleep().await {
167 let mut buf = buffer.lock().unwrap();
168 match buf.reduce() {
169 None => (),
170 Some(Ok(reduced)) => {
171 post(&mailbox, port_id.clone(), reduced, return_undeliverable)
172 }
173 Some(Err(e)) => tracing::error!(
180 "error while reducing update: {}; waiting until the next send to propagate",
181 e
182 ),
183 }
184 }
185 });
186 }
187
188 let alarm = Mutex::new(alarm);
192
193 let reducer_opts = reducer_opts.unwrap_or_default();
195 let max_interval = reducer_opts.max_update_interval();
196 let initial_interval = reducer_opts.initial_update_interval();
197
198 let backoff = Mutex::new(
201 ExponentialBackoffBuilder::new()
202 .with_initial_interval(initial_interval)
203 .with_multiplier(2.0)
204 .with_max_interval(max_interval)
205 .with_max_elapsed_time(None)
206 .build(),
207 );
208
209 Box::new(move |update: wirevalue::Any| {
210 let mut buf = buffer.lock().unwrap();
216 match buf.push(update) {
217 None => {
218 let interval = backoff.lock().unwrap().next_backoff().unwrap();
219 alarm.lock().unwrap().rearm(interval);
220 Ok(())
221 }
222 Some(Ok(reduced)) => {
223 alarm.lock().unwrap().disarm();
224 post(&mailbox, port_id.clone(), reduced, return_undeliverable);
225 Ok(())
226 }
227 Some(Err(e)) => Err((buf.pop().unwrap(), e)),
228 }
229 })
230 }
231 };
232 self.mailbox().bind_untyped(
233 &split_port,
234 mailbox::UntypedUnboundedSender {
235 sender: enqueue,
236 port_id: split_port.clone(),
237 },
238 );
239 Ok(split_port)
240 }
241}
242
243struct UpdateBuffer {
244 buffered: Vec<wirevalue::Any>,
245 reducer: Box<dyn ErasedCommReducer + Send + Sync + 'static>,
246}
247
248impl UpdateBuffer {
249 fn new(reducer: Box<dyn ErasedCommReducer + Send + Sync + 'static>) -> Self {
250 Self {
251 buffered: Vec::new(),
252 reducer,
253 }
254 }
255
256 fn is_empty(&self) -> bool {
257 self.buffered.is_empty()
258 }
259
260 fn pop(&mut self) -> Option<wirevalue::Any> {
261 self.buffered.pop()
262 }
263
264 fn push(&mut self, serialized: wirevalue::Any) -> Option<anyhow::Result<wirevalue::Any>> {
267 let limit = hyperactor_config::global::get(config::SPLIT_MAX_BUFFER_SIZE);
268
269 self.buffered.push(serialized);
270 if self.buffered.len() >= limit {
271 self.reduce()
272 } else {
273 None
274 }
275 }
276
277 fn reduce(&mut self) -> Option<anyhow::Result<wirevalue::Any>> {
278 if self.buffered.is_empty() {
279 None
280 } else {
281 match self.reducer.reduce_updates(take(&mut self.buffered)) {
282 Ok(reduced) => Some(Ok(reduced)),
283 Err((e, b)) => {
284 self.buffered = b;
285 Some(Err(e))
286 }
287 }
288 }
289 }
290}