1use std::mem::take;
18use std::sync::Arc;
19use std::sync::Mutex;
20use std::sync::OnceLock;
21
22use async_trait::async_trait;
23use dashmap::DashSet;
24
25use crate::ActorId;
26use crate::Instance;
27use crate::PortId;
28use crate::accum;
29use crate::accum::ErasedCommReducer;
30use crate::accum::ReducerOpts;
31use crate::accum::ReducerSpec;
32use crate::attrs::Attrs;
33use crate::config;
34use crate::data::Serialized;
35use crate::mailbox;
36use crate::mailbox::MailboxSender;
37use crate::mailbox::MessageEnvelope;
38use crate::time::Alarm;
39
40pub trait Mailbox: crate::private::Sealed + Send + Sync {
42 fn mailbox(&self) -> &crate::Mailbox;
44}
45
46#[async_trait]
51pub trait Actor: Mailbox {
52 type A: crate::Actor;
54
55 fn instance(&self) -> &Instance<Self::A>;
57}
58
59pub(crate) trait MailboxExt: Mailbox {
62 fn post(&self, dest: PortId, headers: Attrs, data: Serialized);
65
66 fn split(
68 &self,
69 port_id: PortId,
70 reducer_spec: Option<ReducerSpec>,
71 reducer_opts: Option<ReducerOpts>,
72 ) -> anyhow::Result<PortId>;
73}
74
75static CAN_SEND_WARNED_MAILBOXES: OnceLock<DashSet<ActorId>> = OnceLock::new();
80
81impl<T: Mailbox + Send + Sync> MailboxExt for T {
83 fn post(&self, dest: PortId, headers: Attrs, data: Serialized) {
84 let return_handle = self.mailbox().bound_return_handle().unwrap_or_else(|| {
85 let actor_id = self.mailbox().actor_id();
86 if CAN_SEND_WARNED_MAILBOXES
87 .get_or_init(DashSet::new)
88 .insert(actor_id.clone())
89 {
90 let bt = std::backtrace::Backtrace::force_capture();
91 tracing::warn!(
92 actor_id = ?actor_id,
93 backtrace = ?bt,
94 "mailbox attempted to post a message without binding Undeliverable<MessageEnvelope>"
95 );
96 }
97 mailbox::monitored_return_handle()
98 });
99
100 let envelope = MessageEnvelope::new(self.mailbox().actor_id().clone(), dest, data, headers);
101 MailboxSender::post(self.mailbox(), envelope, return_handle);
102 }
103
104 fn split(
105 &self,
106 port_id: PortId,
107 reducer_spec: Option<ReducerSpec>,
108 reducer_opts: Option<ReducerOpts>,
109 ) -> anyhow::Result<PortId> {
110 fn post(mailbox: &mailbox::Mailbox, port_id: PortId, msg: Serialized) {
111 mailbox::MailboxSender::post(
112 mailbox,
113 MessageEnvelope::new(mailbox.actor_id().clone(), port_id, msg, Attrs::new()),
114 mailbox::monitored_return_handle(),
119 );
120 }
121
122 let port_index = self.mailbox().allocate_port();
123 let split_port = self.mailbox().actor_id().port_id(port_index);
124 let mailbox = self.mailbox().clone();
125 let reducer = reducer_spec
126 .map(
127 |ReducerSpec {
128 typehash,
129 builder_params,
130 }| { accum::resolve_reducer(typehash, builder_params) },
131 )
132 .transpose()?
133 .flatten();
134 let enqueue: Box<
135 dyn Fn(Serialized) -> Result<(), (Serialized, anyhow::Error)> + Send + Sync,
136 > = match reducer {
137 None => Box::new(move |serialized: Serialized| {
138 post(&mailbox, port_id.clone(), serialized);
139 Ok(())
140 }),
141 Some(reducer) => {
142 let buffer: Arc<Mutex<UpdateBuffer>> =
143 Arc::new(Mutex::new(UpdateBuffer::new(reducer)));
144
145 let alarm = Alarm::new();
146
147 {
148 let mut sleeper = alarm.sleeper();
149 let buffer = Arc::clone(&buffer);
150 let port_id = port_id.clone();
151 let mailbox = mailbox.clone();
152 tokio::spawn(async move {
153 while sleeper.sleep().await {
154 let mut buf = buffer.lock().unwrap();
155 match buf.reduce() {
156 None => (),
157 Some(Ok(reduced)) => post(&mailbox, port_id.clone(), reduced),
158 Some(Err(e)) => tracing::error!(
165 "error while reducing update: {}; waiting until the next send to propagate",
166 e
167 ),
168 }
169 }
170 });
171 }
172
173 let alarm = Mutex::new(alarm);
177
178 let reducer_opts = reducer_opts.unwrap_or_default();
180
181 Box::new(move |update: Serialized| {
182 let mut buf = buffer.lock().unwrap();
188 let was_empty = buf.is_empty();
189 match buf.push(update) {
190 None if was_empty => {
191 alarm
192 .lock()
193 .unwrap()
194 .arm(reducer_opts.max_update_interval());
195 Ok(())
196 }
197 None => Ok(()),
198 Some(Ok(reduced)) => {
199 alarm.lock().unwrap().disarm();
200 post(&mailbox, port_id.clone(), reduced);
201 Ok(())
202 }
203 Some(Err(e)) => Err((buf.pop().unwrap(), e)),
204 }
205 })
206 }
207 };
208 self.mailbox().bind_untyped(
209 &split_port,
210 mailbox::UntypedUnboundedSender {
211 sender: enqueue,
212 port_id: split_port.clone(),
213 },
214 );
215 Ok(split_port)
216 }
217}
218
219struct UpdateBuffer {
220 buffered: Vec<Serialized>,
221 reducer: Box<dyn ErasedCommReducer + Send + Sync + 'static>,
222}
223
224impl UpdateBuffer {
225 fn new(reducer: Box<dyn ErasedCommReducer + Send + Sync + 'static>) -> Self {
226 Self {
227 buffered: Vec::new(),
228 reducer,
229 }
230 }
231
232 fn is_empty(&self) -> bool {
233 self.buffered.is_empty()
234 }
235
236 fn pop(&mut self) -> Option<Serialized> {
237 self.buffered.pop()
238 }
239
240 fn push(&mut self, serialized: Serialized) -> Option<anyhow::Result<Serialized>> {
243 let limit = config::global::get(config::SPLIT_MAX_BUFFER_SIZE);
244
245 self.buffered.push(serialized);
246 if self.buffered.len() >= limit {
247 self.reduce()
248 } else {
249 None
250 }
251 }
252
253 fn reduce(&mut self) -> Option<anyhow::Result<Serialized>> {
254 if self.buffered.is_empty() {
255 None
256 } else {
257 match self.reducer.reduce_updates(take(&mut self.buffered)) {
258 Ok(reduced) => Some(Ok(reduced)),
259 Err((e, b)) => {
260 self.buffered = b;
261 Some(Err(e))
262 }
263 }
264 }
265 }
266}