monarch_distributed_telemetry/
entity_dispatcher.rs1use std::sync::Arc;
20use std::sync::Mutex;
21
22use hyperactor_telemetry::EntityEvent;
23use hyperactor_telemetry::EntityEventDispatcher;
24use monarch_record_batch::RecordBatchBuffer;
25use monarch_record_batch::RecordBatchRow;
26
27use crate::record_batch_sink::FlushCallback;
28use crate::timestamp_to_micros;
29
30#[derive(RecordBatchRow)]
33pub struct Actor {
34 pub id: u64,
36 pub timestamp_us: i64,
38 pub mesh_id: u64,
40 pub rank: u64,
42 pub full_name: String,
44 pub display_name: Option<String>,
46}
47
48#[derive(RecordBatchRow)]
51pub struct Mesh {
52 pub id: u64,
54 pub timestamp_us: i64,
56 pub class: String,
58 pub given_name: String,
60 pub full_name: String,
62 pub shape_json: String,
64 pub parent_mesh_id: Option<u64>,
66 pub parent_view_json: Option<String>,
68}
69
70#[derive(RecordBatchRow)]
73pub struct ActorStatusEvent {
74 pub id: u64,
76 pub timestamp_us: i64,
78 pub actor_id: u64,
80 pub new_status: String,
82 pub reason: Option<String>,
84}
85
86#[derive(RecordBatchRow)]
90pub struct SentMessage {
91 pub id: u64,
93 pub timestamp_us: i64,
95 pub sender_actor_id: u64,
97 pub actor_mesh_id: u64,
99 pub view_json: String,
101 pub shape_json: String,
103}
104
105#[derive(RecordBatchRow)]
109pub struct Message {
110 pub id: u64,
112 pub timestamp_us: i64,
114 pub from_actor_id: u64,
116 pub to_actor_id: u64,
118 pub endpoint: Option<String>,
120 pub port_id: Option<u64>,
122}
123
124#[derive(RecordBatchRow)]
128pub struct MessageStatusEvent {
129 pub id: u64,
131 pub timestamp_us: i64,
133 pub message_id: u64,
135 pub status: String,
137}
138
139struct EntityDispatcherInner {
141 actors_buffer: ActorBuffer,
142 meshes_buffer: MeshBuffer,
143 actor_status_events_buffer: ActorStatusEventBuffer,
144 sent_messages_buffer: SentMessageBuffer,
145 messages_buffer: MessageBuffer,
146 message_status_events_buffer: MessageStatusEventBuffer,
147 batch_size: usize,
148 flush_callback: FlushCallback,
149}
150
151impl EntityDispatcherInner {
152 fn flush_buffer<B: RecordBatchBuffer>(
153 buffer: &mut B,
154 table_name: &str,
155 callback: &FlushCallback,
156 ) -> anyhow::Result<()> {
157 let batch = buffer.drain_to_record_batch()?;
158 callback(table_name, batch);
159 Ok(())
160 }
161
162 fn flush(&mut self) -> anyhow::Result<()> {
163 Self::flush_buffer(&mut self.actors_buffer, "actors", &self.flush_callback)?;
164 Self::flush_buffer(&mut self.meshes_buffer, "meshes", &self.flush_callback)?;
165 Self::flush_buffer(
166 &mut self.actor_status_events_buffer,
167 "actor_status_events",
168 &self.flush_callback,
169 )?;
170 Self::flush_buffer(
171 &mut self.sent_messages_buffer,
172 "sent_messages",
173 &self.flush_callback,
174 )?;
175 Self::flush_buffer(&mut self.messages_buffer, "messages", &self.flush_callback)?;
176 Self::flush_buffer(
177 &mut self.message_status_events_buffer,
178 "message_status_events",
179 &self.flush_callback,
180 )?;
181 Ok(())
182 }
183
184 fn flush_actors_if_full(&mut self) -> anyhow::Result<()> {
185 if self.actors_buffer.len() >= self.batch_size {
186 Self::flush_buffer(&mut self.actors_buffer, "actors", &self.flush_callback)?;
187 }
188 Ok(())
189 }
190
191 fn flush_meshes_if_full(&mut self) -> anyhow::Result<()> {
192 if self.meshes_buffer.len() >= self.batch_size {
193 Self::flush_buffer(&mut self.meshes_buffer, "meshes", &self.flush_callback)?;
194 }
195 Ok(())
196 }
197
198 fn flush_actor_status_events_if_full(&mut self) -> anyhow::Result<()> {
199 if self.actor_status_events_buffer.len() >= self.batch_size {
200 Self::flush_buffer(
201 &mut self.actor_status_events_buffer,
202 "actor_status_events",
203 &self.flush_callback,
204 )?;
205 }
206 Ok(())
207 }
208
209 fn flush_sent_messages_if_full(&mut self) -> anyhow::Result<()> {
210 if self.sent_messages_buffer.len() >= self.batch_size {
211 Self::flush_buffer(
212 &mut self.sent_messages_buffer,
213 "sent_messages",
214 &self.flush_callback,
215 )?;
216 }
217 Ok(())
218 }
219
220 fn flush_messages_if_full(&mut self) -> anyhow::Result<()> {
221 if self.messages_buffer.len() >= self.batch_size {
222 Self::flush_buffer(&mut self.messages_buffer, "messages", &self.flush_callback)?;
223 }
224 Ok(())
225 }
226
227 fn flush_message_status_events_if_full(&mut self) -> anyhow::Result<()> {
228 if self.message_status_events_buffer.len() >= self.batch_size {
229 Self::flush_buffer(
230 &mut self.message_status_events_buffer,
231 "message_status_events",
232 &self.flush_callback,
233 )?;
234 }
235 Ok(())
236 }
237}
238
239#[derive(Clone)]
244pub struct EntityDispatcher {
245 inner: Arc<Mutex<EntityDispatcherInner>>,
246}
247
248impl EntityDispatcher {
249 pub fn new(batch_size: usize, flush_callback: FlushCallback) -> Self {
259 let inner = Arc::new(Mutex::new(EntityDispatcherInner {
260 actors_buffer: ActorBuffer::default(),
261 meshes_buffer: MeshBuffer::default(),
262 actor_status_events_buffer: ActorStatusEventBuffer::default(),
263 sent_messages_buffer: SentMessageBuffer::default(),
264 messages_buffer: MessageBuffer::default(),
265 message_status_events_buffer: MessageStatusEventBuffer::default(),
266 batch_size,
267 flush_callback,
268 }));
269 Self { inner }
270 }
271
272 pub fn flush(&self) -> anyhow::Result<()> {
278 let mut inner = self
279 .inner
280 .lock()
281 .map_err(|_| anyhow::anyhow!("lock poisoned"))?;
282 inner.flush()
283 }
284}
285
286impl EntityEventDispatcher for EntityDispatcher {
287 fn dispatch(&self, event: EntityEvent) -> Result<(), anyhow::Error> {
288 let mut inner = self
289 .inner
290 .lock()
291 .map_err(|_| anyhow::anyhow!("lock poisoned"))?;
292
293 match event {
294 EntityEvent::Actor(actor_event) => {
295 inner.actors_buffer.insert(Actor {
296 id: actor_event.id,
297 timestamp_us: timestamp_to_micros(&actor_event.timestamp),
298 mesh_id: actor_event.mesh_id,
299 rank: actor_event.rank,
300 full_name: actor_event.full_name,
301 display_name: actor_event.display_name,
302 });
303 inner.flush_actors_if_full()?;
304 }
305 EntityEvent::Mesh(mesh_event) => {
306 inner.meshes_buffer.insert(Mesh {
307 id: mesh_event.id,
308 timestamp_us: timestamp_to_micros(&mesh_event.timestamp),
309 class: mesh_event.class,
310 given_name: mesh_event.given_name,
311 full_name: mesh_event.full_name,
312 shape_json: mesh_event.shape_json,
313 parent_mesh_id: mesh_event.parent_mesh_id,
314 parent_view_json: mesh_event.parent_view_json,
315 });
316 inner.flush_meshes_if_full()?;
317 }
318 EntityEvent::ActorStatus(status_event) => {
319 inner.actor_status_events_buffer.insert(ActorStatusEvent {
320 id: status_event.id,
321 timestamp_us: timestamp_to_micros(&status_event.timestamp),
322 actor_id: status_event.actor_id,
323 new_status: status_event.new_status,
324 reason: status_event.reason,
325 });
326 inner.flush_actor_status_events_if_full()?;
327 }
328 EntityEvent::SentMessage(event) => {
329 inner.sent_messages_buffer.insert(SentMessage {
330 id: hyperactor_telemetry::generate_sent_message_id(event.sender_actor_id),
331 timestamp_us: timestamp_to_micros(&event.timestamp),
332 sender_actor_id: event.sender_actor_id,
333 actor_mesh_id: event.actor_mesh_id,
334 view_json: event.view_json,
335 shape_json: event.shape_json,
336 });
337 inner.flush_sent_messages_if_full()?;
338 }
339 EntityEvent::Message(event) => {
340 inner.messages_buffer.insert(Message {
341 id: event.id,
342 timestamp_us: timestamp_to_micros(&event.timestamp),
343 from_actor_id: event.from_actor_id,
344 to_actor_id: event.to_actor_id,
345 endpoint: event.endpoint,
346 port_id: event.port_id,
347 });
348 inner.flush_messages_if_full()?;
349 }
350 EntityEvent::MessageStatus(event) => {
351 inner
352 .message_status_events_buffer
353 .insert(MessageStatusEvent {
354 id: event.id,
355 timestamp_us: timestamp_to_micros(&event.timestamp),
356 message_id: event.message_id,
357 status: event.status,
358 });
359 inner.flush_message_status_events_if_full()?;
360 }
361 }
362 Ok(())
363 }
364}