Skip to main content

monarch_hyperactor/code_sync/
rsync.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::io::ErrorKind;
10use std::net::SocketAddr;
11use std::path::Path;
12use std::path::PathBuf;
13use std::process::Stdio;
14use std::time::Duration;
15
16use anyhow::Context;
17use anyhow::Result;
18use anyhow::bail;
19use anyhow::ensure;
20use async_trait::async_trait;
21use futures::FutureExt;
22use futures::StreamExt;
23use futures::TryFutureExt;
24use futures::TryStreamExt;
25use futures::try_join;
26use hyperactor as reference;
27use hyperactor::Actor;
28use hyperactor::Bind;
29use hyperactor::Endpoint as _;
30use hyperactor::Handler;
31use hyperactor::Unbind;
32use hyperactor::context;
33use hyperactor_mesh::ActorMesh;
34use hyperactor_mesh::connect::Connect;
35use hyperactor_mesh::connect::accept;
36#[cfg(feature = "packaged_rsync")]
37use lazy_static::lazy_static;
38use nix::sys::signal;
39use nix::sys::signal::Signal;
40use nix::unistd::Pid;
41use serde::Deserialize;
42use serde::Serialize;
43use tempfile::TempDir;
44#[cfg(feature = "packaged_rsync")]
45use tempfile::TempPath;
46use tokio::fs;
47use tokio::net::TcpListener;
48use tokio::net::TcpStream;
49use tokio::process::Child;
50use tokio::process::Command;
51#[cfg(feature = "packaged_rsync")]
52use tokio::sync::OnceCell;
53use tracing::warn;
54use typeuri::Named;
55
56use crate::code_sync::WorkspaceLocation;
57
58#[cfg(feature = "packaged_rsync")]
59lazy_static! {
60    static ref RSYNC_BIN_PATH: OnceCell<TempPath> = OnceCell::new();
61}
62
63async fn get_rsync_bin_path() -> Result<&'static Path> {
64    #[cfg(feature = "packaged_rsync")]
65    {
66        use std::io::Write;
67        use std::os::unix::fs::PermissionsExt;
68        Ok(RSYNC_BIN_PATH
69            .get_or_try_init(|| async {
70                tokio::task::spawn_blocking(|| {
71                    let mut tmp = tempfile::NamedTempFile::with_prefix("rsync.")?;
72                    let rsync_bin = include_bytes!("rsync.bin");
73                    tmp.write_all(rsync_bin)?;
74                    let bin_path = tmp.into_temp_path();
75                    std::fs::set_permissions(&bin_path, std::fs::Permissions::from_mode(0o755))?;
76                    anyhow::Ok(bin_path)
77                })
78                .await?
79            })
80            .await?)
81    }
82    #[cfg(not(feature = "packaged_rsync"))]
83    {
84        Ok(Path::new("rsync"))
85    }
86}
87
88/// Represents a single file change from rsync
89#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
90pub struct Change {
91    /// The type of change that occurred
92    pub change_type: ChangeType,
93    /// The path of the file that changed
94    pub path: PathBuf,
95}
96
97#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
98pub enum ChangeMessage {
99    /// Path was deleted
100    Deleting,
101}
102
103#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
104pub enum ChangeAction {
105    /// File was received.
106    Received,
107    // Path was changed/created locally.
108    LocalChange,
109    NotTransferred,
110}
111
112/// The type of change that occurred to a file
113#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
114pub enum ChangeType {
115    Message(ChangeMessage),
116    Action(ChangeAction, FileType),
117}
118
119/// The type of file that changed
120#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
121pub enum FileType {
122    /// Regular file
123    File,
124    /// Directory
125    Directory,
126    /// Symbolic link
127    Symlink,
128}
129
130/// Represents the result of an rsync operation with details about what was transferred
131#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Named)]
132pub struct RsyncResult {
133    /// All changes that occurred during the rsync operation
134    pub changes: Vec<Change>,
135}
136wirevalue::register_type!(RsyncResult);
137
138impl RsyncResult {
139    /// Create an empty rsync result
140    pub fn empty() -> Self {
141        Self {
142            changes: Vec::new(),
143        }
144    }
145
146    /// Parse rsync output to extract file transfer information
147    /// Since create_dir_all ensures the workspace exists, we can assume all stdout lines are changes
148    fn parse_from_output(stdout: &str) -> Result<Self> {
149        let mut changes = Vec::new();
150
151        // Parse stdout for file operations (when using --itemize-changes)
152        // All lines in stdout represent changes since the workspace directory exists
153
154        // rsync itemize format: YXcstpoguax path
155        // Y = update type (>, c, h, ., etc.)
156        // X = file type (f=file, d=directory, L=symlink, etc.)
157        for line in stdout.lines() {
158            let line = line.trim();
159            let (raw_changes, path) = line.split_at(11);
160            let raw_changes = raw_changes.trim();
161            let path = &path[1..]; // remove leading space
162
163            let mut iter = raw_changes.chars();
164            let change_type = match iter.next().context("missing change type")? {
165                '*' => ChangeType::Message(match iter.as_str() {
166                    "deleting" => ChangeMessage::Deleting,
167                    _ => bail!("unexpected change message: {}", raw_changes),
168                }),
169                c => {
170                    let atype = match c {
171                        '.' => ChangeAction::NotTransferred,
172                        '>' => ChangeAction::Received,
173                        'c' => ChangeAction::LocalChange,
174                        _ => bail!("unexpected change type: {}", raw_changes),
175                    };
176                    let file_type = match iter.next().context("missing file type")? {
177                        'f' => FileType::File,
178                        'd' => FileType::Directory,
179                        'L' => FileType::Symlink,
180                        _ => bail!("unexpected file type: {}", raw_changes),
181                    };
182                    ChangeType::Action(atype, file_type)
183                }
184            };
185
186            changes.push(Change {
187                change_type,
188                path: PathBuf::from(path),
189            });
190        }
191
192        Ok(Self { changes })
193    }
194}
195
196pub async fn do_rsync(addr: &SocketAddr, workspace: &Path) -> Result<RsyncResult> {
197    // Make sure the target workspace exists, mainly to avoid the "created director ..."
198    // line in rsync output.
199    fs::create_dir_all(workspace).await?;
200
201    let rsync_bin_path = get_rsync_bin_path().await?;
202    let output = Command::new(rsync_bin_path)
203        .arg("--archive")
204        .arg("--delete")
205        // Show detailed changes for each file
206        .arg("--itemize-changes")
207        // By setting these flags, we make `rsync` immune to multiple invocations
208        // targeting the same dir, which can happen if we don't take care to only
209        // allow one worker on a given host to do the `rsync`.
210        .arg("--delete-after")
211        .arg("--delay-updates")
212        .arg("--exclude=.rsync-tmp.*")
213        .arg(format!("--partial-dir=.rsync-tmp.{}", addr.port()))
214        .arg(format!("rsync://{}/workspace", addr))
215        .arg(format!("{}/", workspace.display()))
216        .stdout(Stdio::piped())
217        .stderr(Stdio::piped())
218        .output()
219        .await
220        .map_err(|e| {
221            if e.kind() == std::io::ErrorKind::NotFound {
222                anyhow::anyhow!(
223                    "rsync binary: '{}' does not exist. Please ensure 'rsync' is installed and available in PATH.",
224                    rsync_bin_path.display()
225                )
226            } else {
227                anyhow::anyhow!("failed to execute rsync: {}", e)
228            }
229        })?;
230
231    output
232        .status
233        .exit_ok()
234        .with_context(|| format!("rsync failed: {}", String::from_utf8_lossy(&output.stderr)))?;
235
236    RsyncResult::parse_from_output(&String::from_utf8(output.stdout)?)
237}
238
239#[derive(Debug)]
240pub struct RsyncDaemon {
241    child: Child,
242    state: TempDir,
243    addr: SocketAddr,
244}
245
246impl RsyncDaemon {
247    pub async fn spawn(listener: TcpListener, workspace: &Path) -> Result<Self> {
248        let state = TempDir::with_prefix("rsyncd.")?;
249
250        // Write rsync config file
251        // TODO(agallagher): We can setup a secrets file to provide some measure of
252        // security and prevent stray rsync calls from hitting the server.
253        let content = format!(
254            r#"\
255[workspace]
256    path = {workspace}
257    use chroot = no
258    list = no
259    read only = true
260    write only = false
261    uid = {uid}
262    hosts allow = localhost ip6-localhost
263"#,
264            workspace = workspace.display(),
265            uid = nix::unistd::getuid().as_raw(),
266        );
267        let config = state.path().join("rsync.config");
268        fs::write(&config, content).await?;
269
270        // Find free port.  This is potentially racy, as some process could
271        // potentially bind to this port in between now and when `rsync` starts up
272        // below.  But I'm not sure a better way to do this, as rsync doesn't appear
273        // to support `rsync --sockopts=SO_PORTREUSE` (to share this port we've
274        // reserved) or `--port=0` (to pick a free port -- it'll just always use
275        // 873).
276        let addr = listener.local_addr()?;
277        std::mem::drop(listener);
278
279        // Spawn the rsync daemon.
280        let mut child = Command::new(get_rsync_bin_path().await?)
281            .arg("--daemon")
282            .arg("--no-detach")
283            .arg(format!("--address={}", addr.ip()))
284            .arg(format!("--port={}", addr.port()))
285            .arg(format!("--config={}", config.display()))
286            .arg(format!("--log-file={}/log", state.path().display()))
287            .kill_on_drop(true)
288            .spawn()?;
289
290        // Wait until the rsync daemon is ready to connect via polling it (I tried polling
291        // the log file to wait for the "listening" log line, but that gets prevented *before*
292        // it actually starts the listening loop).
293        tokio::select! {
294            res = child.wait() => bail!("unexpected early exit: {:?}", res),
295            res = async {
296                loop {
297                    match TcpStream::connect(addr).await {
298                        Err(err) if err.kind() == ErrorKind::ConnectionRefused => {
299                            tokio::time::sleep(Duration::from_millis(1)).await
300                        }
301                        Err(err) => return Err(err.into()),
302                        Ok(_) => break,
303                    }
304                }
305                anyhow::Ok(())
306            } => res?,
307        }
308
309        Ok(Self { child, state, addr })
310    }
311
312    pub fn addr(&self) -> &SocketAddr {
313        &self.addr
314    }
315
316    pub async fn shutdown(mut self) -> Result<String> {
317        let logs = fs::read_to_string(self.state.path().join("log")).await;
318        let id = self.child.id().context("missing pid")?;
319        let pid = Pid::from_raw(id as i32);
320        signal::kill(pid, Signal::SIGINT)?;
321        let status = self.child.wait().await?;
322        // rsync exists with 20 when sent SIGINT.
323        ensure!(status.code() == Some(20));
324        Ok(logs?)
325    }
326}
327
328#[derive(Debug, Clone, Named, Serialize, Deserialize, Bind, Unbind)]
329pub struct RsyncMessage {
330    /// The connect message to create a duplex bytestream with the client.
331    pub connect: reference::PortRef<Connect>,
332    /// A port to send back the rsync result or any errors.
333    pub result: reference::PortRef<Result<RsyncResult, String>>,
334    /// The location of the workspace to sync.
335    pub workspace: WorkspaceLocation,
336}
337wirevalue::register_type!(RsyncMessage);
338
339#[derive(Debug, Default)]
340#[hyperactor::export(handlers = [RsyncMessage { cast = true }])]
341#[hyperactor::spawnable]
342pub struct RsyncActor {
343    //workspace: WorkspaceLocation,
344}
345
346impl Actor for RsyncActor {}
347
348#[async_trait]
349impl Handler<RsyncMessage> for RsyncActor {
350    async fn handle(
351        &mut self,
352        cx: &hyperactor::Context<Self>,
353        RsyncMessage {
354            workspace,
355            connect,
356            result,
357        }: RsyncMessage,
358    ) -> Result<(), anyhow::Error> {
359        let res = async {
360            let workspace = workspace
361                .resolve()
362                .context("resolving workspace location")?;
363            let (connect_msg, completer) = Connect::allocate(cx.self_addr().clone(), cx);
364            connect.post(cx, connect_msg);
365
366            // some machines (e.g. github CI) do not have ipv6, so try ipv6 then fallback to ipv4
367            let ipv6_lo: SocketAddr = "[::1]:0".parse()?;
368            let ipv4_lo: SocketAddr = "127.0.0.1:0".parse()?;
369            let addrs: [SocketAddr; 2] = [ipv6_lo, ipv4_lo];
370
371            let (listener, mut stream) = try_join!(
372                TcpListener::bind(&addrs[..]).err_into(),
373                completer.complete(),
374            )?;
375            let addr = listener.local_addr()?;
376            let (rsync_result, _) = try_join!(do_rsync(&addr, &workspace), async move {
377                let (mut local, _) = listener.accept().await?;
378                tokio::io::copy_bidirectional(&mut stream, &mut local).await?;
379                anyhow::Ok(())
380            },)?;
381            anyhow::Ok(rsync_result)
382        }
383        .await;
384        result.post(cx, res.map_err(|e| format!("{:#?}", e)));
385        Ok(())
386    }
387}
388
389pub async fn rsync_mesh<C: context::Actor + Copy + Unpin>(
390    cx: C,
391    actor_mesh: &ActorMesh<RsyncActor>,
392    local_workspace: PathBuf,
393    remote_workspace: WorkspaceLocation,
394) -> Result<Vec<RsyncResult>> {
395    use ndslice::View;
396
397    // Spawn a rsync daemon to accept incoming connections from actors.
398    let daemon = RsyncDaemon::spawn(TcpListener::bind(("::1", 0)).await?, &local_workspace).await?;
399    let daemon_addr = daemon.addr();
400
401    let (rsync_conns_tx, rsync_conns_rx) = cx.mailbox().open_port::<Connect>();
402    let num_actors = actor_mesh.region().num_ranks();
403
404    let res = try_join!(
405        rsync_conns_rx
406            .take(num_actors)
407            .err_into::<anyhow::Error>()
408            .try_for_each_concurrent(None, |connect| async move {
409                let (mut local, mut stream) = try_join!(
410                    TcpStream::connect(*daemon_addr).err_into(),
411                    accept(cx, cx.instance().self_addr().clone(), connect),
412                )?;
413                tokio::io::copy_bidirectional(&mut local, &mut stream).await?;
414                anyhow::Ok(())
415            })
416            .boxed(),
417        async move {
418            let (result_tx, result_rx) = cx.mailbox().open_port::<Result<RsyncResult, String>>();
419            actor_mesh.cast(
420                &cx,
421                RsyncMessage {
422                    connect: rsync_conns_tx.bind(),
423                    result: result_tx.bind(),
424                    workspace: remote_workspace,
425                },
426            )?;
427            let res: Vec<RsyncResult> = result_rx
428                .take(num_actors)
429                .map(|res| res?.map_err(anyhow::Error::msg))
430                .try_collect()
431                .await?;
432            anyhow::Ok(res)
433        },
434    );
435
436    // Kill rsync server and attempt to grab the logs.
437    let logs = daemon.shutdown().await;
438
439    // Return results, attaching rsync daemon logs on error.
440    match res {
441        Ok(((), results)) => {
442            let _ = logs?;
443            Ok(results)
444        }
445        Err(err) => match logs {
446            Ok(logs) => Err(err).with_context(|| format!("rsync server logs: {}", logs)),
447            Err(shutdown_err) => {
448                warn!("failed to read logs from rsync daemon: {:?}", shutdown_err);
449                Err(err)
450            }
451        },
452    }
453}
454
455#[cfg(test)]
456mod tests {
457    use anyhow::Result;
458    use anyhow::anyhow;
459    use hyperactor_mesh::ActorMesh;
460    use hyperactor_mesh::context;
461    use hyperactor_mesh::test_utils;
462    use tempfile::TempDir;
463    use tokio::fs;
464    use tokio::net::TcpListener;
465
466    use super::*;
467
468    #[tokio::test]
469    // TODO: OSS: Cannot assign requested address (os error 99)
470    #[cfg_attr(not(fbcode_build), ignore)]
471    async fn test_simple() -> Result<()> {
472        let input = TempDir::new()?;
473        fs::write(input.path().join("foo.txt"), "hello world").await?;
474
475        let output = TempDir::new()?;
476
477        let server = TcpListener::bind(("::", 0)).await?;
478        let daemon = RsyncDaemon::spawn(server, output.path()).await?;
479        do_rsync(daemon.addr(), input.path()).await?;
480        daemon.shutdown().await?;
481
482        assert!(!dir_diff::is_different(&input, &output).map_err(|e| anyhow!("{:?}", e))?);
483
484        Ok(())
485    }
486
487    #[tokio::test]
488    // TODO: OSS: Cannot assign requested address (os error 99)
489    #[cfg_attr(not(fbcode_build), ignore)]
490    async fn test_rsync_actor_and_mesh() -> Result<()> {
491        // Create source workspace with test files
492        let source_workspace = TempDir::new()?;
493        fs::write(source_workspace.path().join("test1.txt"), "content1").await?;
494        fs::write(source_workspace.path().join("test2.txt"), "content2").await?;
495        fs::create_dir(source_workspace.path().join("subdir")).await?;
496        fs::write(source_workspace.path().join("subdir/test3.txt"), "content3").await?;
497
498        // Create target workspace for the actors
499        let target_workspace = TempDir::new()?;
500        fs::create_dir(target_workspace.path().join("subdir5")).await?;
501        fs::write(target_workspace.path().join("foo.txt"), "something").await?;
502
503        // Set up actor mesh with 2 RsyncActors
504        let cx = context().await;
505        let instance = cx.actor_instance;
506        let mut host_mesh = test_utils::local_host_mesh(1).await;
507        let proc_mesh = host_mesh
508            .spawn(instance, "rsync_test", ndslice::Extent::unity(), None, None)
509            .await
510            .unwrap();
511        // Spawn actor mesh with RsyncActors
512        let actor_mesh: ActorMesh<RsyncActor> =
513            proc_mesh.spawn(instance, "rsync_test", &()).await?;
514
515        // Test rsync_mesh function - this coordinates rsync operations across the mesh
516        let results = rsync_mesh(
517            instance,
518            &actor_mesh,
519            source_workspace.path().to_path_buf(),
520            WorkspaceLocation::Constant(target_workspace.path().to_path_buf()),
521        )
522        .await?;
523
524        // Verify we got results back
525        assert_eq!(results.len(), 1); // We have 1 actor in the mesh
526
527        let rsync_result = &results[0];
528
529        // Verify that files were transferred (should be at least the files we created)
530        // Note: The exact files detected may vary based on rsync's itemization,
531        // but we should have some indication of transfer activity
532        println!("Rsync result: {:#?}", rsync_result);
533
534        // Verify we copied correctly.
535        assert!(
536            !dir_diff::is_different(&source_workspace, &target_workspace)
537                .map_err(|e| anyhow!("{:?}", e))?
538        );
539
540        let _ = host_mesh.shutdown(instance).await;
541        Ok(())
542    }
543
544    #[tokio::test]
545    async fn test_rsync_result_parsing() -> Result<()> {
546        // Test the parsing logic with mock rsync output
547        let stdout = r#">f+++++++++ test1.txt
548>f+++++++++ test2.txt
549cd+++++++++ subdir/
550>f+++++++++ subdir/test3.txt
551*deleting   old_file.txt
552"#;
553
554        let result = RsyncResult::parse_from_output(stdout)?;
555
556        // Define the expected changes
557        let expected_changes = vec![
558            Change {
559                change_type: ChangeType::Action(ChangeAction::Received, FileType::File),
560                path: PathBuf::from("test1.txt"),
561            },
562            Change {
563                change_type: ChangeType::Action(ChangeAction::Received, FileType::File),
564                path: PathBuf::from("test2.txt"),
565            },
566            Change {
567                change_type: ChangeType::Action(ChangeAction::LocalChange, FileType::Directory),
568                path: PathBuf::from("subdir/"),
569            },
570            Change {
571                change_type: ChangeType::Action(ChangeAction::Received, FileType::File),
572                path: PathBuf::from("subdir/test3.txt"),
573            },
574            Change {
575                change_type: ChangeType::Message(ChangeMessage::Deleting),
576                path: PathBuf::from("old_file.txt"),
577            },
578        ];
579
580        // Verify we have all expected changes
581        assert_eq!(result.changes.len(), expected_changes.len());
582
583        // Compare each change
584        for (actual, expected) in result.changes.iter().zip(expected_changes.iter()) {
585            assert_eq!(
586                actual, expected,
587                "Change mismatch: actual={:?}, expected={:?}",
588                actual, expected
589            );
590        }
591
592        // Verify the entire result matches
593        assert_eq!(result.changes, expected_changes);
594
595        Ok(())
596    }
597}