monarch_rdma/
rdma_components.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! # RDMA Components
10//!
11//! This module provides the core RDMA building blocks for establishing and managing RDMA connections.
12//!
13//! ## Core Components
14//!
15//! * `IbvDomain` - Manages RDMA resources including context, protection domain, and memory region
16//! * `IbvQueuePair` - Handles communication between endpoints via queue pairs and completion queues
17//!
18//! ## RDMA Overview
19//!
20//! Remote Direct Memory Access (RDMA) allows direct memory access from the memory of one computer
21//! into the memory of another without involving either computer's operating system. This permits
22//! high-throughput, low-latency networking with minimal CPU overhead.
23//!
24//! ## Connection Architecture
25//!
26//! The module manages the following ibverbs primitives:
27//!
28//! 1. **Queue Pairs (QP)**: Each connection has a send queue and a receive queue
29//! 2. **Completion Queues (CQ)**: Events are reported when operations complete
30//! 3. **Memory Regions (MR)**: Memory must be registered with the RDMA device before use
31//! 4. **Protection Domains (PD)**: Provide isolation between different connections
32//!
33//! ## Connection Lifecycle
34//!
35//! 1. Create an `IbvDomain` with `new()`
36//! 2. Create an `IbvQueuePair` from the domain
37//! 3. Exchange connection info with remote peer (application must handle this)
38//! 4. Connect to remote endpoint with `connect()`
39//! 5. Perform RDMA operations (read/write)
40//! 6. Poll for completions
41//! 7. Resources are cleaned up when dropped
42
43/// Maximum size for a single RDMA operation in bytes (1 GiB)
44use std::fs;
45use std::result::Result;
46use std::sync::Arc;
47use std::time::Duration;
48
49use hyperactor::ActorHandle;
50use hyperactor::ActorRef;
51use hyperactor::context;
52use hyperactor::reference;
53use serde::Deserialize;
54use serde::Serialize;
55use typeuri::Named;
56
57use crate::RdmaManagerActor;
58use crate::RdmaOp;
59use crate::RdmaOpType;
60use crate::ReleaseBufferClient;
61use crate::backend::RdmaBackend;
62use crate::backend::RdmaRemoteBackendContext;
63use crate::backend::ibverbs::IbvBuffer;
64use crate::backend::ibverbs::manager_actor::IbvManagerActor;
65use crate::backend::ibverbs::manager_actor::IbvManagerMessageClient;
66use crate::backend::tcp::manager_actor::TcpManagerActor;
67use crate::local_memory::RdmaLocalMemory;
68
69/// Lightweight handle representing a registered RDMA buffer.
70///
71/// Contains an id for the buffer registration, the buffer size, a reference
72/// to the owning [`RdmaManagerActor`], and backend-specific contexts for
73/// performing RDMA operations.
74#[derive(Debug, Named, Clone, Serialize, Deserialize)]
75pub struct RdmaRemoteBuffer {
76    pub id: usize,
77    pub size: usize,
78    pub owner: reference::ActorRef<RdmaManagerActor>,
79    pub backends: Vec<RdmaRemoteBackendContext>,
80}
81wirevalue::register_type!(RdmaRemoteBuffer);
82
83/// Backend handle returned by [`RdmaRemoteBuffer::choose_backend`].
84///
85/// `RdmaBackend` is not object-safe (associated type + generic parameter
86/// on `submit`), so we use an enum that delegates to the concrete handle.
87#[derive(Debug)]
88pub enum RdmaLocalBackend {
89    Ibv(ActorHandle<IbvManagerActor>),
90    Tcp(ActorHandle<TcpManagerActor>),
91}
92
93impl RdmaLocalBackend {
94    async fn submit(
95        &mut self,
96        cx: &(impl context::Actor + Send + Sync),
97        ops: Vec<RdmaOp>,
98        timeout: Duration,
99    ) -> Result<(), anyhow::Error> {
100        match self {
101            RdmaLocalBackend::Ibv(h) => h.submit(cx, ops, timeout).await,
102            RdmaLocalBackend::Tcp(h) => h.submit(cx, ops, timeout).await,
103        }
104    }
105}
106
107impl RdmaRemoteBuffer {
108    /// Choose the best available backend for this buffer.
109    ///
110    /// Prefers ibverbs when both the local and remote sides support it.
111    /// Falls back to TCP when ibverbs is unavailable and
112    /// [`RDMA_ALLOW_TCP_FALLBACK`](crate::config::RDMA_ALLOW_TCP_FALLBACK)
113    /// is enabled.
114    pub async fn choose_backend(
115        &self,
116        client: &(impl context::Actor + Send + Sync),
117    ) -> Result<RdmaLocalBackend, anyhow::Error> {
118        if self.has_ibverbs_backend() {
119            if let Ok(ibv_backend) = IbvManagerActor::local_handle(client).await {
120                return Ok(RdmaLocalBackend::Ibv(ibv_backend));
121            }
122
123            return self
124                .tcp_fallback_or_bail("no ibverbs backend on the local side", client)
125                .await;
126        }
127
128        self.tcp_fallback_or_bail(
129            &format!(
130                "no ibverbs backend on the remote side (owner={})",
131                self.owner.actor_id()
132            ),
133            client,
134        )
135        .await
136    }
137
138    /// Push data from local memory into this remote buffer (local->remote).
139    pub async fn write_from_local(
140        &self,
141        client: &(impl context::Actor + Send + Sync),
142        local: Arc<dyn RdmaLocalMemory>,
143        timeout: u64,
144    ) -> Result<bool, anyhow::Error> {
145        let mut backend = self.choose_backend(client).await?;
146        backend
147            .submit(
148                client,
149                vec![RdmaOp {
150                    op_type: RdmaOpType::WriteFromLocal,
151                    local,
152                    remote: self.clone(),
153                }],
154                Duration::from_secs(timeout),
155            )
156            .await?;
157        Ok(true)
158    }
159
160    /// Pull data from this remote buffer into local memory (remote->local).
161    pub async fn read_into_local(
162        &self,
163        client: &(impl context::Actor + Send + Sync),
164        local: Arc<dyn RdmaLocalMemory>,
165        timeout: u64,
166    ) -> Result<bool, anyhow::Error> {
167        let mut backend = self.choose_backend(client).await?;
168        backend
169            .submit(
170                client,
171                vec![RdmaOp {
172                    op_type: RdmaOpType::ReadIntoLocal,
173                    local,
174                    remote: self.clone(),
175                }],
176                Duration::from_secs(timeout),
177            )
178            .await?;
179        Ok(true)
180    }
181
182    /// Get a TCP backend handle, or bail if TCP fallback is disabled.
183    async fn tcp_fallback_or_bail(
184        &self,
185        reason: &str,
186        client: &(impl context::Actor + Send + Sync),
187    ) -> Result<RdmaLocalBackend, anyhow::Error> {
188        if !hyperactor_config::global::get(crate::config::RDMA_ALLOW_TCP_FALLBACK) {
189            anyhow::bail!(
190                "{reason}, and TCP fallback is disabled; \
191                 enable it with monarch.configure(rdma_allow_tcp_fallback=True)"
192            );
193        }
194
195        tracing::warn!("falling back to TCP transport ({reason})");
196
197        let tcp_backend = TcpManagerActor::local_handle(client).await?;
198        Ok(RdmaLocalBackend::Tcp(tcp_backend))
199    }
200
201    /// Drop the buffer and release remote handles.
202    pub async fn drop_buffer(&self, client: &impl context::Actor) -> Result<(), anyhow::Error> {
203        tracing::debug!("[buffer] dropping buffer id={}", self.id);
204        self.owner.release_buffer(client, self.id).await?;
205        Ok(())
206    }
207
208    /// Whether this buffer has an ibverbs backend context.
209    fn has_ibverbs_backend(&self) -> bool {
210        self.backends
211            .iter()
212            .any(|b| matches!(b, RdmaRemoteBackendContext::Ibverbs(..)))
213    }
214
215    /// Resolve the ibverbs backend context for this buffer.
216    ///
217    /// Returns `None` if the buffer has no ibverbs backend context (i.e.,
218    /// the remote side was created without ibverbs). Returns `Some(Err(...))`
219    /// if the context exists but lazy MR resolution fails. Returns
220    /// `Some(Ok(...))` on success.
221    pub async fn resolve_ibv(
222        &self,
223        client: &impl context::Actor,
224    ) -> Option<Result<(reference::ActorRef<IbvManagerActor>, IbvBuffer), anyhow::Error>> {
225        let (remote_ibv_mgr, remote_ibv_buf) = self.backends.iter().find_map(|b| match b {
226            RdmaRemoteBackendContext::Ibverbs(mgr, buf) => Some((mgr, buf)),
227            _ => None,
228        })?;
229
230        Some(
231            remote_ibv_buf
232                .get_or_try_init(async || {
233                    remote_ibv_mgr
234                        .request_buffer(client, self.id)
235                        .await?
236                        .ok_or_else(|| anyhow::anyhow!("buffer {} not found", self.id))
237                })
238                .await
239                .cloned()
240                .map(|buf| (remote_ibv_mgr.clone(), buf)),
241        )
242    }
243
244    /// Extract the TCP backend context from this buffer.
245    ///
246    /// Unlike [`resolve_ibv`], no lazy initialization is needed -- the
247    /// TCP backend only needs the remote actor ref and the buffer id.
248    pub fn resolve_tcp(&self) -> Result<(ActorRef<TcpManagerActor>, usize), anyhow::Error> {
249        self.backends
250            .iter()
251            .find_map(|b| match b {
252                RdmaRemoteBackendContext::Tcp(tcp_ref) => Some((tcp_ref.clone(), self.id)),
253                _ => None,
254            })
255            .ok_or_else(|| anyhow::anyhow!("tcp backend not found for buffer: {:?}", self))
256    }
257}
258
259/// Utility to validate execution context.
260///
261/// Remote Execution environments do not always have access to the nvidia_peermem module
262/// and/or set the PeerMappingOverride parameter due to security. This function can be
263/// used to validate that the execution context when running operations that need this
264/// functionality (ie. cudaHostRegisterIoMemory).
265///
266/// # Returns
267///
268/// * `Ok(())` if the execution context is valid
269/// * `Err(anyhow::Error)` if the execution context is invalid
270pub async fn validate_execution_context() -> Result<(), anyhow::Error> {
271    // Check for nvidia peermem
272    match fs::read_to_string("/proc/modules") {
273        Ok(contents) => {
274            if !contents.contains("nvidia_peermem") {
275                return Err(anyhow::anyhow!(
276                    "nvidia_peermem module not found in /proc/modules"
277                ));
278            }
279        }
280        Err(e) => {
281            return Err(anyhow::anyhow!(e));
282        }
283    }
284
285    // Test file access to nvidia params
286    match fs::read_to_string("/proc/driver/nvidia/params") {
287        Ok(contents) => {
288            if !contents.contains("PeerMappingOverride=1") {
289                return Err(anyhow::anyhow!(
290                    "PeerMappingOverride=1 not found in /proc/driver/nvidia/params"
291                ));
292            }
293        }
294        Err(e) => {
295            return Err(anyhow::anyhow!(e));
296        }
297    }
298    Ok(())
299}
300
301/// Get all segments that have been registered with MRs
302///
303/// # Returns
304/// * `Vec<SegmentInfo>` - Vector containing all registered segment information
305pub fn get_registered_cuda_segments() -> Vec<rdmaxcel_sys::rdma_segment_info_t> {
306    unsafe {
307        let segment_count = rdmaxcel_sys::rdma_get_active_segment_count();
308        if segment_count <= 0 {
309            return Vec::new();
310        }
311
312        let mut segments = vec![
313            std::mem::MaybeUninit::<rdmaxcel_sys::rdma_segment_info_t>::zeroed()
314                .assume_init();
315            segment_count as usize
316        ];
317        let actual_count =
318            rdmaxcel_sys::rdma_get_all_segment_info(segments.as_mut_ptr(), segment_count);
319
320        if actual_count > 0 {
321            segments.truncate(actual_count as usize);
322            segments
323        } else {
324            Vec::new()
325        }
326    }
327}
328
329/// Segment scanner callback type alias for convenience.
330pub type SegmentScannerFn = rdmaxcel_sys::RdmaxcelSegmentScannerFn;
331
332/// Register a segment scanner callback.
333///
334/// The scanner callback is called during RDMA segment registration to discover
335/// CUDA memory segments. The callback should fill the provided buffer with
336/// segment information and return the total count of segments found.
337///
338/// If the returned count exceeds the buffer size, the caller will allocate
339/// a larger buffer and retry.
340///
341/// Pass `None` to unregister the scanner.
342///
343/// # Safety
344///
345/// The provided callback function must be safe to call from C code and must
346/// properly handle the segment buffer.
347pub fn register_segment_scanner(scanner: SegmentScannerFn) {
348    // SAFETY: We are registering a callback function pointer with rdmaxcel.
349    unsafe { rdmaxcel_sys::rdmaxcel_register_segment_scanner(scanner) }
350}