monarch_distributed_telemetry/
entity_dispatcher.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! EntityDispatcher - Dispatches entity lifecycle events to Arrow RecordBatches
10//!
11//! Produces tables:
12//! - `actors`: Actor creation events
13//! - `meshes`: Mesh creation events
14//! - `actor_status_events`: Actor status change events
15//! - `sent_messages`: Sent message events
16//! - `messages`: Received message events
17//! - `message_status_events`: Received message status transitions
18
19use std::sync::Arc;
20use std::sync::Mutex;
21
22use hyperactor_telemetry::EntityEvent;
23use hyperactor_telemetry::EntityEventDispatcher;
24use monarch_record_batch::RecordBatchBuffer;
25use monarch_record_batch::RecordBatchRow;
26
27use crate::record_batch_sink::FlushCallback;
28use crate::timestamp_to_micros;
29
30/// Row data for the actors table.
31/// Logged when actors are created.
32#[derive(RecordBatchRow)]
33pub struct Actor {
34    /// Unique identifier for this actor
35    pub id: u64,
36    /// Timestamp in microseconds since Unix epoch
37    pub timestamp_us: i64,
38    /// ID of the mesh this actor belongs to
39    pub mesh_id: u64,
40    /// Rank index into the mesh shape
41    pub rank: u64,
42    /// Full hierarchical name of this actor
43    pub full_name: String,
44    /// User-facing name for this actor
45    pub display_name: Option<String>,
46}
47
48/// Row data for the meshes table.
49/// Logged when meshes are created.
50#[derive(RecordBatchRow)]
51pub struct Mesh {
52    /// Unique identifier for this mesh
53    pub id: u64,
54    /// Timestamp in microseconds since Unix epoch
55    pub timestamp_us: i64,
56    /// mesh class (e.g., "Proc", "Host", "Python<SomeUserDefinedActor>")
57    pub class: String,
58    /// User-provided name for this mesh
59    pub given_name: String,
60    /// Full hierarchical name as it appears in supervision events
61    pub full_name: String,
62    /// Shape of the mesh, serialized from ndslice::Extent
63    pub shape_json: String,
64    /// Parent mesh ID (None for root meshes)
65    pub parent_mesh_id: Option<u64>,
66    /// Region over which the parent spawned this mesh, serialized from ndslice::Region
67    pub parent_view_json: Option<String>,
68}
69
70/// Row data for the actor_status_events table.
71/// Logged when actors change status.
72#[derive(RecordBatchRow)]
73pub struct ActorStatusEvent {
74    /// Unique identifier for this event
75    pub id: u64,
76    /// Timestamp in microseconds since Unix epoch
77    pub timestamp_us: i64,
78    /// ID of the actor whose status changed
79    pub actor_id: u64,
80    /// New status value (e.g. "Created", "Idle", "Failed")
81    pub new_status: String,
82    /// Reason for the status change (e.g. error message for Failed)
83    pub reason: Option<String>,
84}
85
86/// Row data for the sent_messages table.
87///
88/// Tracks messages from the perspective of the sending actor.
89#[derive(RecordBatchRow)]
90pub struct SentMessage {
91    /// Unique identifier for this sent message record
92    pub id: u64,
93    /// Timestamp in microseconds since Unix epoch
94    pub timestamp_us: i64,
95    /// ID of the sending actor
96    pub sender_actor_id: u64,
97    /// ID of the actor mesh over which the message was sent (0 for point-to-point)
98    pub actor_mesh_id: u64,
99    /// Region over which the message was sent, serialized from ndslice::Region
100    pub view_json: String,
101    /// Shape of the message, serialized from ndslice::Shape
102    pub shape_json: String,
103}
104
105/// Row data for the messages table.
106///
107/// Tracks messages from the receiver's perspective.
108#[derive(RecordBatchRow)]
109pub struct Message {
110    /// Unique identifier for this received message
111    pub id: u64,
112    /// Timestamp in microseconds since Unix epoch
113    pub timestamp_us: i64,
114    /// Hash of sender's ActorId
115    pub from_actor_id: u64,
116    /// Hash of receiver's ActorId
117    pub to_actor_id: u64,
118    /// Message handler type name
119    pub endpoint: Option<String>,
120    /// Port identifier (reserved)
121    pub port_id: Option<u64>,
122}
123
124/// Row data for the message_status_events table.
125///
126/// Logs status transitions for received messages.
127#[derive(RecordBatchRow)]
128pub struct MessageStatusEvent {
129    /// Unique identifier for this status event
130    pub id: u64,
131    /// Timestamp in microseconds since Unix epoch
132    pub timestamp_us: i64,
133    /// FK to messages.id
134    pub message_id: u64,
135    /// Status value: "queued", "active", or "complete"
136    pub status: String,
137}
138
139/// Inner state of EntityDispatcher.
140struct EntityDispatcherInner {
141    actors_buffer: ActorBuffer,
142    meshes_buffer: MeshBuffer,
143    actor_status_events_buffer: ActorStatusEventBuffer,
144    sent_messages_buffer: SentMessageBuffer,
145    messages_buffer: MessageBuffer,
146    message_status_events_buffer: MessageStatusEventBuffer,
147    batch_size: usize,
148    flush_callback: FlushCallback,
149}
150
151impl EntityDispatcherInner {
152    fn flush_buffer<B: RecordBatchBuffer>(
153        buffer: &mut B,
154        table_name: &str,
155        callback: &FlushCallback,
156    ) -> anyhow::Result<()> {
157        let batch = buffer.drain_to_record_batch()?;
158        callback(table_name, batch);
159        Ok(())
160    }
161
162    fn flush(&mut self) -> anyhow::Result<()> {
163        Self::flush_buffer(&mut self.actors_buffer, "actors", &self.flush_callback)?;
164        Self::flush_buffer(&mut self.meshes_buffer, "meshes", &self.flush_callback)?;
165        Self::flush_buffer(
166            &mut self.actor_status_events_buffer,
167            "actor_status_events",
168            &self.flush_callback,
169        )?;
170        Self::flush_buffer(
171            &mut self.sent_messages_buffer,
172            "sent_messages",
173            &self.flush_callback,
174        )?;
175        Self::flush_buffer(&mut self.messages_buffer, "messages", &self.flush_callback)?;
176        Self::flush_buffer(
177            &mut self.message_status_events_buffer,
178            "message_status_events",
179            &self.flush_callback,
180        )?;
181        Ok(())
182    }
183
184    fn flush_actors_if_full(&mut self) -> anyhow::Result<()> {
185        if self.actors_buffer.len() >= self.batch_size {
186            Self::flush_buffer(&mut self.actors_buffer, "actors", &self.flush_callback)?;
187        }
188        Ok(())
189    }
190
191    fn flush_meshes_if_full(&mut self) -> anyhow::Result<()> {
192        if self.meshes_buffer.len() >= self.batch_size {
193            Self::flush_buffer(&mut self.meshes_buffer, "meshes", &self.flush_callback)?;
194        }
195        Ok(())
196    }
197
198    fn flush_actor_status_events_if_full(&mut self) -> anyhow::Result<()> {
199        if self.actor_status_events_buffer.len() >= self.batch_size {
200            Self::flush_buffer(
201                &mut self.actor_status_events_buffer,
202                "actor_status_events",
203                &self.flush_callback,
204            )?;
205        }
206        Ok(())
207    }
208
209    fn flush_sent_messages_if_full(&mut self) -> anyhow::Result<()> {
210        if self.sent_messages_buffer.len() >= self.batch_size {
211            Self::flush_buffer(
212                &mut self.sent_messages_buffer,
213                "sent_messages",
214                &self.flush_callback,
215            )?;
216        }
217        Ok(())
218    }
219
220    fn flush_messages_if_full(&mut self) -> anyhow::Result<()> {
221        if self.messages_buffer.len() >= self.batch_size {
222            Self::flush_buffer(&mut self.messages_buffer, "messages", &self.flush_callback)?;
223        }
224        Ok(())
225    }
226
227    fn flush_message_status_events_if_full(&mut self) -> anyhow::Result<()> {
228        if self.message_status_events_buffer.len() >= self.batch_size {
229            Self::flush_buffer(
230                &mut self.message_status_events_buffer,
231                "message_status_events",
232                &self.flush_callback,
233            )?;
234        }
235        Ok(())
236    }
237}
238
239/// Dispatches entity lifecycle events to Arrow RecordBatches.
240///
241/// This is separate from RecordBatchSink which handles tracing events (spans, events).
242/// Both use the same FlushCallback pattern to push batches to the database scanner's tables.
243#[derive(Clone)]
244pub struct EntityDispatcher {
245    inner: Arc<Mutex<EntityDispatcherInner>>,
246}
247
248impl EntityDispatcher {
249    /// Create a new EntityDispatcher with the specified batch size and flush callback.
250    ///
251    /// The callback receives (table_name, record_batch) when a batch is ready.
252    /// The callback should handle empty batches by creating the table with the
253    /// schema but not appending the empty data.
254    ///
255    /// # Arguments
256    /// * `batch_size` - Number of rows to buffer before flushing each table
257    /// * `flush_callback` - Called with (table_name, record_batch) when a batch is ready
258    pub fn new(batch_size: usize, flush_callback: FlushCallback) -> Self {
259        let inner = Arc::new(Mutex::new(EntityDispatcherInner {
260            actors_buffer: ActorBuffer::default(),
261            meshes_buffer: MeshBuffer::default(),
262            actor_status_events_buffer: ActorStatusEventBuffer::default(),
263            sent_messages_buffer: SentMessageBuffer::default(),
264            messages_buffer: MessageBuffer::default(),
265            message_status_events_buffer: MessageStatusEventBuffer::default(),
266            batch_size,
267            flush_callback,
268        }));
269        Self { inner }
270    }
271
272    /// Flush all buffers, emitting batches for actors and meshes tables.
273    ///
274    /// This always emits batches for both tables, even if they are empty.
275    /// The callback is expected to handle empty batches by creating the table
276    /// with the correct schema but not appending empty data.
277    pub fn flush(&self) -> anyhow::Result<()> {
278        let mut inner = self
279            .inner
280            .lock()
281            .map_err(|_| anyhow::anyhow!("lock poisoned"))?;
282        inner.flush()
283    }
284}
285
286impl EntityEventDispatcher for EntityDispatcher {
287    fn dispatch(&self, event: EntityEvent) -> Result<(), anyhow::Error> {
288        let mut inner = self
289            .inner
290            .lock()
291            .map_err(|_| anyhow::anyhow!("lock poisoned"))?;
292
293        match event {
294            EntityEvent::Actor(actor_event) => {
295                inner.actors_buffer.insert(Actor {
296                    id: actor_event.id,
297                    timestamp_us: timestamp_to_micros(&actor_event.timestamp),
298                    mesh_id: actor_event.mesh_id,
299                    rank: actor_event.rank,
300                    full_name: actor_event.full_name,
301                    display_name: actor_event.display_name,
302                });
303                inner.flush_actors_if_full()?;
304            }
305            EntityEvent::Mesh(mesh_event) => {
306                inner.meshes_buffer.insert(Mesh {
307                    id: mesh_event.id,
308                    timestamp_us: timestamp_to_micros(&mesh_event.timestamp),
309                    class: mesh_event.class,
310                    given_name: mesh_event.given_name,
311                    full_name: mesh_event.full_name,
312                    shape_json: mesh_event.shape_json,
313                    parent_mesh_id: mesh_event.parent_mesh_id,
314                    parent_view_json: mesh_event.parent_view_json,
315                });
316                inner.flush_meshes_if_full()?;
317            }
318            EntityEvent::ActorStatus(status_event) => {
319                inner.actor_status_events_buffer.insert(ActorStatusEvent {
320                    id: status_event.id,
321                    timestamp_us: timestamp_to_micros(&status_event.timestamp),
322                    actor_id: status_event.actor_id,
323                    new_status: status_event.new_status,
324                    reason: status_event.reason,
325                });
326                inner.flush_actor_status_events_if_full()?;
327            }
328            EntityEvent::SentMessage(event) => {
329                inner.sent_messages_buffer.insert(SentMessage {
330                    id: hyperactor_telemetry::generate_sent_message_id(event.sender_actor_id),
331                    timestamp_us: timestamp_to_micros(&event.timestamp),
332                    sender_actor_id: event.sender_actor_id,
333                    actor_mesh_id: event.actor_mesh_id,
334                    view_json: event.view_json,
335                    shape_json: event.shape_json,
336                });
337                inner.flush_sent_messages_if_full()?;
338            }
339            EntityEvent::Message(event) => {
340                inner.messages_buffer.insert(Message {
341                    id: event.id,
342                    timestamp_us: timestamp_to_micros(&event.timestamp),
343                    from_actor_id: event.from_actor_id,
344                    to_actor_id: event.to_actor_id,
345                    endpoint: event.endpoint,
346                    port_id: event.port_id,
347                });
348                inner.flush_messages_if_full()?;
349            }
350            EntityEvent::MessageStatus(event) => {
351                inner
352                    .message_status_events_buffer
353                    .insert(MessageStatusEvent {
354                        id: event.id,
355                        timestamp_us: timestamp_to_micros(&event.timestamp),
356                        message_id: event.message_id,
357                        status: event.status,
358                    });
359                inner.flush_message_status_events_if_full()?;
360            }
361        }
362        Ok(())
363    }
364}