monarch_rdma/
rdma_components.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! # RDMA Components
10//!
11//! This module provides the core RDMA building blocks for establishing and managing RDMA connections.
12//!
13//! ## Core Components
14//!
15//! * `IbvDomain` - Manages RDMA resources including context, protection domain, and memory region
16//! * `IbvQueuePair` - Handles communication between endpoints via queue pairs and completion queues
17//!
18//! ## RDMA Overview
19//!
20//! Remote Direct Memory Access (RDMA) allows direct memory access from the memory of one computer
21//! into the memory of another without involving either computer's operating system. This permits
22//! high-throughput, low-latency networking with minimal CPU overhead.
23//!
24//! ## Connection Architecture
25//!
26//! The module manages the following ibverbs primitives:
27//!
28//! 1. **Queue Pairs (QP)**: Each connection has a send queue and a receive queue
29//! 2. **Completion Queues (CQ)**: Events are reported when operations complete
30//! 3. **Memory Regions (MR)**: Memory must be registered with the RDMA device before use
31//! 4. **Protection Domains (PD)**: Provide isolation between different connections
32//!
33//! ## Connection Lifecycle
34//!
35//! 1. Create an `IbvDomain` with `new()`
36//! 2. Create an `IbvQueuePair` from the domain
37//! 3. Exchange connection info with remote peer (application must handle this)
38//! 4. Connect to remote endpoint with `connect()`
39//! 5. Perform RDMA operations (read/write)
40//! 6. Poll for completions
41//! 7. Resources are cleaned up when dropped
42
43/// Maximum size for a single RDMA operation in bytes (1 GiB)
44use std::fs;
45use std::result::Result;
46use std::sync::Arc;
47use std::time::Duration;
48
49use hyperactor::ActorRef;
50use hyperactor::context;
51use hyperactor::reference;
52use serde::Deserialize;
53use serde::Serialize;
54use typeuri::Named;
55
56use crate::RdmaManagerActor;
57use crate::RdmaOp;
58use crate::RdmaOpType;
59use crate::ReleaseBufferClient;
60use crate::backend::RdmaBackend;
61use crate::backend::RdmaRemoteBackendContext;
62use crate::backend::ibverbs::IbvBuffer;
63use crate::backend::ibverbs::manager_actor::IbvBackend;
64use crate::backend::ibverbs::manager_actor::IbvManagerActor;
65use crate::backend::ibverbs::manager_actor::IbvManagerMessageClient;
66use crate::backend::tcp::manager_actor::TcpBackend;
67use crate::backend::tcp::manager_actor::TcpManagerActor;
68use crate::local_memory::RdmaLocalMemory;
69
70/// Lightweight handle representing a registered RDMA buffer.
71///
72/// Contains an id for the buffer registration, the buffer size, a reference
73/// to the owning [`RdmaManagerActor`], and backend-specific contexts for
74/// performing RDMA operations.
75#[derive(Debug, Named, Clone, Serialize, Deserialize)]
76pub struct RdmaRemoteBuffer {
77    pub id: usize,
78    pub size: usize,
79    pub owner: reference::ActorRef<RdmaManagerActor>,
80    pub backends: Vec<RdmaRemoteBackendContext>,
81}
82wirevalue::register_type!(RdmaRemoteBuffer);
83
84/// Backend handle returned by [`RdmaRemoteBuffer::choose_backend`].
85///
86/// `RdmaBackend` is not object-safe (associated type + generic parameter
87/// on `submit`), so we use an enum that delegates to the concrete handle.
88#[derive(Debug)]
89pub enum RdmaLocalBackend {
90    Ibv(IbvBackend),
91    Tcp(TcpBackend),
92}
93
94impl RdmaLocalBackend {
95    async fn submit(
96        &mut self,
97        cx: &(impl context::Actor + Send + Sync),
98        ops: Vec<RdmaOp>,
99        timeout: Duration,
100    ) -> Result<(), anyhow::Error> {
101        match self {
102            RdmaLocalBackend::Ibv(h) => h.submit(cx, ops, timeout).await,
103            RdmaLocalBackend::Tcp(h) => h.submit(cx, ops, timeout).await,
104        }
105    }
106}
107
108impl RdmaRemoteBuffer {
109    /// Choose the best available backend for this buffer.
110    ///
111    /// Prefers ibverbs when both the local and remote sides support it.
112    /// Falls back to TCP when ibverbs is unavailable and
113    /// [`RDMA_ALLOW_TCP_FALLBACK`](crate::config::RDMA_ALLOW_TCP_FALLBACK)
114    /// is enabled.
115    pub async fn choose_backend(
116        &self,
117        client: &(impl context::Actor + Send + Sync),
118    ) -> Result<RdmaLocalBackend, anyhow::Error> {
119        if self.has_ibverbs_backend() {
120            if let Ok(ibv_handle) = IbvManagerActor::local_handle(client).await {
121                return Ok(RdmaLocalBackend::Ibv(IbvBackend(ibv_handle)));
122            }
123
124            return self
125                .tcp_fallback_or_bail("no ibverbs backend on the local side", client)
126                .await;
127        }
128
129        self.tcp_fallback_or_bail(
130            &format!(
131                "no ibverbs backend on the remote side (owner={})",
132                self.owner.actor_id()
133            ),
134            client,
135        )
136        .await
137    }
138
139    /// Push data from local memory into this remote buffer (local->remote).
140    pub async fn write_from_local(
141        &self,
142        client: &(impl context::Actor + Send + Sync),
143        local: Arc<dyn RdmaLocalMemory>,
144        timeout: u64,
145    ) -> Result<bool, anyhow::Error> {
146        let mut backend = self.choose_backend(client).await?;
147        backend
148            .submit(
149                client,
150                vec![RdmaOp {
151                    op_type: RdmaOpType::WriteFromLocal,
152                    local,
153                    remote: self.clone(),
154                }],
155                Duration::from_secs(timeout),
156            )
157            .await?;
158        Ok(true)
159    }
160
161    /// Pull data from this remote buffer into local memory (remote->local).
162    pub async fn read_into_local(
163        &self,
164        client: &(impl context::Actor + Send + Sync),
165        local: Arc<dyn RdmaLocalMemory>,
166        timeout: u64,
167    ) -> Result<bool, anyhow::Error> {
168        let mut backend = self.choose_backend(client).await?;
169        backend
170            .submit(
171                client,
172                vec![RdmaOp {
173                    op_type: RdmaOpType::ReadIntoLocal,
174                    local,
175                    remote: self.clone(),
176                }],
177                Duration::from_secs(timeout),
178            )
179            .await?;
180        Ok(true)
181    }
182
183    /// Get a TCP backend handle, or bail if TCP fallback is disabled.
184    async fn tcp_fallback_or_bail(
185        &self,
186        reason: &str,
187        client: &(impl context::Actor + Send + Sync),
188    ) -> Result<RdmaLocalBackend, anyhow::Error> {
189        if !hyperactor_config::global::get(crate::config::RDMA_ALLOW_TCP_FALLBACK) {
190            anyhow::bail!(
191                "{reason}, and TCP fallback is disabled; \
192                 enable it with monarch.configure(rdma_allow_tcp_fallback=True)"
193            );
194        }
195
196        tracing::warn!("falling back to TCP transport ({reason})");
197
198        let tcp_handle = TcpManagerActor::local_handle(client).await?;
199        Ok(RdmaLocalBackend::Tcp(TcpBackend(tcp_handle)))
200    }
201
202    /// Drop the buffer and release remote handles.
203    pub async fn drop_buffer(&self, client: &impl context::Actor) -> Result<(), anyhow::Error> {
204        tracing::debug!("[buffer] dropping buffer id={}", self.id);
205        self.owner.release_buffer(client, self.id).await?;
206        Ok(())
207    }
208
209    /// Whether this buffer has an ibverbs backend context.
210    fn has_ibverbs_backend(&self) -> bool {
211        self.backends
212            .iter()
213            .any(|b| matches!(b, RdmaRemoteBackendContext::Ibverbs(..)))
214    }
215
216    /// Resolve the ibverbs backend context for this buffer.
217    ///
218    /// Returns `None` if the buffer has no ibverbs backend context (i.e.,
219    /// the remote side was created without ibverbs). Returns `Some(Err(...))`
220    /// if the context exists but lazy MR resolution fails. Returns
221    /// `Some(Ok(...))` on success.
222    pub async fn resolve_ibv(
223        &self,
224        client: &impl context::Actor,
225    ) -> Option<Result<(reference::ActorRef<IbvManagerActor>, IbvBuffer), anyhow::Error>> {
226        let (remote_ibv_mgr, remote_ibv_buf) = self.backends.iter().find_map(|b| match b {
227            RdmaRemoteBackendContext::Ibverbs(mgr, buf) => Some((mgr, buf)),
228            _ => None,
229        })?;
230
231        Some(
232            remote_ibv_buf
233                .get_or_try_init(async || {
234                    remote_ibv_mgr
235                        .request_buffer(client, self.id)
236                        .await?
237                        .ok_or_else(|| anyhow::anyhow!("buffer {} not found", self.id))
238                })
239                .await
240                .cloned()
241                .map(|buf| (remote_ibv_mgr.clone(), buf)),
242        )
243    }
244
245    /// Extract the TCP backend context from this buffer.
246    ///
247    /// Unlike [`resolve_ibv`], no lazy initialization is needed -- the
248    /// TCP backend only needs the remote actor ref and the buffer id.
249    pub fn resolve_tcp(&self) -> Result<(ActorRef<TcpManagerActor>, usize), anyhow::Error> {
250        self.backends
251            .iter()
252            .find_map(|b| match b {
253                RdmaRemoteBackendContext::Tcp(tcp_ref) => Some((tcp_ref.clone(), self.id)),
254                _ => None,
255            })
256            .ok_or_else(|| anyhow::anyhow!("tcp backend not found for buffer: {:?}", self))
257    }
258}
259
260/// Utility to validate execution context.
261///
262/// Remote Execution environments do not always have access to the nvidia_peermem module
263/// and/or set the PeerMappingOverride parameter due to security. This function can be
264/// used to validate that the execution context when running operations that need this
265/// functionality (ie. cudaHostRegisterIoMemory).
266///
267/// # Returns
268///
269/// * `Ok(())` if the execution context is valid
270/// * `Err(anyhow::Error)` if the execution context is invalid
271pub async fn validate_execution_context() -> Result<(), anyhow::Error> {
272    // Check for nvidia peermem
273    match fs::read_to_string("/proc/modules") {
274        Ok(contents) => {
275            if !contents.contains("nvidia_peermem") {
276                return Err(anyhow::anyhow!(
277                    "nvidia_peermem module not found in /proc/modules"
278                ));
279            }
280        }
281        Err(e) => {
282            return Err(anyhow::anyhow!(e));
283        }
284    }
285
286    // Test file access to nvidia params
287    match fs::read_to_string("/proc/driver/nvidia/params") {
288        Ok(contents) => {
289            if !contents.contains("PeerMappingOverride=1") {
290                return Err(anyhow::anyhow!(
291                    "PeerMappingOverride=1 not found in /proc/driver/nvidia/params"
292                ));
293            }
294        }
295        Err(e) => {
296            return Err(anyhow::anyhow!(e));
297        }
298    }
299    Ok(())
300}
301
302/// Get all segments that have been registered with MRs for the given PD.
303///
304/// Each protection domain maintains independent segment registrations, so
305/// callers must pass the PD whose lkeys they intend to use.
306pub fn get_registered_cuda_segments(
307    pd: *mut rdmaxcel_sys::ibv_pd,
308) -> Vec<rdmaxcel_sys::rdma_segment_info_t> {
309    unsafe {
310        let segment_count = rdmaxcel_sys::rdma_get_active_segment_count(pd);
311        if segment_count <= 0 {
312            return Vec::new();
313        }
314
315        let mut segments = vec![
316            std::mem::MaybeUninit::<rdmaxcel_sys::rdma_segment_info_t>::zeroed()
317                .assume_init();
318            segment_count as usize
319        ];
320        let actual_count = rdmaxcel_sys::rdma_get_all_registered_segment_info(
321            pd,
322            segments.as_mut_ptr(),
323            segment_count,
324        );
325
326        if actual_count > 0 {
327            segments.truncate(actual_count as usize);
328            segments
329        } else {
330            Vec::new()
331        }
332    }
333}
334
335/// Segment scanner callback type alias for convenience.
336pub type SegmentScannerFn = rdmaxcel_sys::RdmaxcelSegmentScannerFn;
337
338/// Register a segment scanner callback.
339///
340/// The scanner callback is called during RDMA segment registration to discover
341/// CUDA memory segments. The callback should fill the provided buffer with
342/// segment information and return the total count of segments found.
343///
344/// If the returned count exceeds the buffer size, the caller will allocate
345/// a larger buffer and retry.
346///
347/// Pass `None` to unregister the scanner.
348///
349/// # Safety
350///
351/// The provided callback function must be safe to call from C code and must
352/// properly handle the segment buffer.
353pub fn register_segment_scanner(scanner: SegmentScannerFn) {
354    // SAFETY: We are registering a callback function pointer with rdmaxcel.
355    unsafe { rdmaxcel_sys::rdmaxcel_register_segment_scanner(scanner) }
356}