1use std::mem::take;
18use std::sync::Arc;
19use std::sync::Mutex;
20use std::sync::OnceLock;
21
22use async_trait::async_trait;
23use dashmap::DashSet;
24
25use crate::ActorId;
26use crate::Instance;
27use crate::PortId;
28use crate::accum;
29use crate::accum::ErasedCommReducer;
30use crate::accum::ReducerOpts;
31use crate::accum::ReducerSpec;
32use crate::attrs::Attrs;
33use crate::config;
34use crate::data::Serialized;
35use crate::mailbox;
36use crate::mailbox::MailboxSender;
37use crate::mailbox::MessageEnvelope;
38use crate::time::Alarm;
39
40pub trait Mailbox: crate::private::Sealed + Send + Sync {
42 fn mailbox(&self) -> &crate::Mailbox;
44}
45
46#[async_trait]
51pub trait Actor: Mailbox {
52 type A: crate::Actor;
54
55 fn instance(&self) -> &Instance<Self::A>;
57}
58
59pub(crate) trait MailboxExt: Mailbox {
62 fn post(&self, dest: PortId, headers: Attrs, data: Serialized, return_undeliverable: bool);
65
66 fn split(
68 &self,
69 port_id: PortId,
70 reducer_spec: Option<ReducerSpec>,
71 reducer_opts: Option<ReducerOpts>,
72 return_undeliverable: bool,
73 ) -> anyhow::Result<PortId>;
74}
75
76static CAN_SEND_WARNED_MAILBOXES: OnceLock<DashSet<ActorId>> = OnceLock::new();
81
82impl<T: Actor + Send + Sync> MailboxExt for T {
84 fn post(&self, dest: PortId, headers: Attrs, data: Serialized, return_undeliverable: bool) {
85 let return_handle = self.mailbox().bound_return_handle().unwrap_or_else(|| {
86 let actor_id = self.mailbox().actor_id();
87 if CAN_SEND_WARNED_MAILBOXES
88 .get_or_init(DashSet::new)
89 .insert(actor_id.clone())
90 {
91 let bt = std::backtrace::Backtrace::force_capture();
92 tracing::warn!(
93 actor_id = ?actor_id,
94 backtrace = ?bt,
95 "mailbox attempted to post a message without binding Undeliverable<MessageEnvelope>"
96 );
97 }
98 mailbox::monitored_return_handle()
99 });
100
101 let mut envelope =
102 MessageEnvelope::new(self.mailbox().actor_id().clone(), dest, data, headers);
103 envelope.set_return_undeliverable(return_undeliverable);
104 MailboxSender::post(self.mailbox(), envelope, return_handle);
105 }
106
107 fn split(
108 &self,
109 port_id: PortId,
110 reducer_spec: Option<ReducerSpec>,
111 reducer_opts: Option<ReducerOpts>,
112 return_undeliverable: bool,
113 ) -> anyhow::Result<PortId> {
114 fn post(
115 mailbox: &mailbox::Mailbox,
116 port_id: PortId,
117 msg: Serialized,
118 return_undeliverable: bool,
119 ) {
120 let mut envelope =
121 MessageEnvelope::new(mailbox.actor_id().clone(), port_id, msg, Attrs::new());
122 envelope.set_return_undeliverable(return_undeliverable);
123 mailbox::MailboxSender::post(
124 mailbox,
125 envelope,
126 mailbox::monitored_return_handle(),
131 );
132 }
133
134 let port_index = self.mailbox().allocate_port();
135 let split_port = self.mailbox().actor_id().port_id(port_index);
136 let mailbox = self.mailbox().clone();
137 let reducer = reducer_spec
138 .map(
139 |ReducerSpec {
140 typehash,
141 builder_params,
142 }| { accum::resolve_reducer(typehash, builder_params) },
143 )
144 .transpose()?
145 .flatten();
146 let enqueue: Box<
147 dyn Fn(Serialized) -> Result<(), (Serialized, anyhow::Error)> + Send + Sync,
148 > = match reducer {
149 None => Box::new(move |serialized: Serialized| {
150 post(&mailbox, port_id.clone(), serialized, return_undeliverable);
151 Ok(())
152 }),
153 Some(reducer) => {
154 let buffer: Arc<Mutex<UpdateBuffer>> =
155 Arc::new(Mutex::new(UpdateBuffer::new(reducer)));
156
157 let alarm = Alarm::new();
158
159 {
160 let mut sleeper = alarm.sleeper();
161 let buffer = Arc::clone(&buffer);
162 let port_id = port_id.clone();
163 let mailbox = mailbox.clone();
164 tokio::spawn(async move {
165 while sleeper.sleep().await {
166 let mut buf = buffer.lock().unwrap();
167 match buf.reduce() {
168 None => (),
169 Some(Ok(reduced)) => {
170 post(&mailbox, port_id.clone(), reduced, return_undeliverable)
171 }
172 Some(Err(e)) => tracing::error!(
179 "error while reducing update: {}; waiting until the next send to propagate",
180 e
181 ),
182 }
183 }
184 });
185 }
186
187 let alarm = Mutex::new(alarm);
191
192 let reducer_opts = reducer_opts.unwrap_or_default();
194
195 Box::new(move |update: Serialized| {
196 let mut buf = buffer.lock().unwrap();
202 let was_empty = buf.is_empty();
203 match buf.push(update) {
204 None if was_empty => {
205 alarm
206 .lock()
207 .unwrap()
208 .arm(reducer_opts.max_update_interval());
209 Ok(())
210 }
211 None => Ok(()),
212 Some(Ok(reduced)) => {
213 alarm.lock().unwrap().disarm();
214 post(&mailbox, port_id.clone(), reduced, return_undeliverable);
215 Ok(())
216 }
217 Some(Err(e)) => Err((buf.pop().unwrap(), e)),
218 }
219 })
220 }
221 };
222 self.mailbox().bind_untyped(
223 &split_port,
224 mailbox::UntypedUnboundedSender {
225 sender: enqueue,
226 port_id: split_port.clone(),
227 },
228 );
229 Ok(split_port)
230 }
231}
232
233struct UpdateBuffer {
234 buffered: Vec<Serialized>,
235 reducer: Box<dyn ErasedCommReducer + Send + Sync + 'static>,
236}
237
238impl UpdateBuffer {
239 fn new(reducer: Box<dyn ErasedCommReducer + Send + Sync + 'static>) -> Self {
240 Self {
241 buffered: Vec::new(),
242 reducer,
243 }
244 }
245
246 fn is_empty(&self) -> bool {
247 self.buffered.is_empty()
248 }
249
250 fn pop(&mut self) -> Option<Serialized> {
251 self.buffered.pop()
252 }
253
254 fn push(&mut self, serialized: Serialized) -> Option<anyhow::Result<Serialized>> {
257 let limit = config::global::get(config::SPLIT_MAX_BUFFER_SIZE);
258
259 self.buffered.push(serialized);
260 if self.buffered.len() >= limit {
261 self.reduce()
262 } else {
263 None
264 }
265 }
266
267 fn reduce(&mut self) -> Option<anyhow::Result<Serialized>> {
268 if self.buffered.is_empty() {
269 None
270 } else {
271 match self.reducer.reduce_updates(take(&mut self.buffered)) {
272 Ok(reduced) => Some(Ok(reduced)),
273 Err((e, b)) => {
274 self.buffered = b;
275 Some(Err(e))
276 }
277 }
278 }
279 }
280}