monarch_hyperactor/code_sync/
rsync.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::io::ErrorKind;
10use std::net::SocketAddr;
11use std::path::Path;
12use std::path::PathBuf;
13use std::process::Stdio;
14use std::time::Duration;
15
16use anyhow::Context;
17use anyhow::Result;
18use anyhow::bail;
19use anyhow::ensure;
20use async_trait::async_trait;
21use futures::FutureExt;
22use futures::StreamExt;
23use futures::TryFutureExt;
24use futures::TryStreamExt;
25use futures::try_join;
26use hyperactor::Actor;
27use hyperactor::Bind;
28use hyperactor::Handler;
29use hyperactor::Unbind;
30use hyperactor::context;
31use hyperactor::reference;
32use hyperactor_mesh::ActorMesh;
33use hyperactor_mesh::connect::Connect;
34use hyperactor_mesh::connect::accept;
35#[cfg(feature = "packaged_rsync")]
36use lazy_static::lazy_static;
37use nix::sys::signal;
38use nix::sys::signal::Signal;
39use nix::unistd::Pid;
40use serde::Deserialize;
41use serde::Serialize;
42use tempfile::TempDir;
43#[cfg(feature = "packaged_rsync")]
44use tempfile::TempPath;
45use tokio::fs;
46use tokio::net::TcpListener;
47use tokio::net::TcpStream;
48use tokio::process::Child;
49use tokio::process::Command;
50#[cfg(feature = "packaged_rsync")]
51use tokio::sync::OnceCell;
52use tracing::warn;
53use typeuri::Named;
54
55use crate::code_sync::WorkspaceLocation;
56
57#[cfg(feature = "packaged_rsync")]
58lazy_static! {
59    static ref RSYNC_BIN_PATH: OnceCell<TempPath> = OnceCell::new();
60}
61
62async fn get_rsync_bin_path() -> Result<&'static Path> {
63    #[cfg(feature = "packaged_rsync")]
64    {
65        use std::io::Write;
66        use std::os::unix::fs::PermissionsExt;
67        Ok(RSYNC_BIN_PATH
68            .get_or_try_init(|| async {
69                tokio::task::spawn_blocking(|| {
70                    let mut tmp = tempfile::NamedTempFile::with_prefix("rsync.")?;
71                    let rsync_bin = include_bytes!("rsync.bin");
72                    tmp.write_all(rsync_bin)?;
73                    let bin_path = tmp.into_temp_path();
74                    std::fs::set_permissions(&bin_path, std::fs::Permissions::from_mode(0o755))?;
75                    anyhow::Ok(bin_path)
76                })
77                .await?
78            })
79            .await?)
80    }
81    #[cfg(not(feature = "packaged_rsync"))]
82    {
83        Ok(Path::new("rsync"))
84    }
85}
86
87/// Represents a single file change from rsync
88#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
89pub struct Change {
90    /// The type of change that occurred
91    pub change_type: ChangeType,
92    /// The path of the file that changed
93    pub path: PathBuf,
94}
95
96#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
97pub enum ChangeMessage {
98    /// Path was deleted
99    Deleting,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
103pub enum ChangeAction {
104    /// File was received.
105    Received,
106    // Path was changed/created locally.
107    LocalChange,
108    NotTransferred,
109}
110
111/// The type of change that occurred to a file
112#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
113pub enum ChangeType {
114    Message(ChangeMessage),
115    Action(ChangeAction, FileType),
116}
117
118/// The type of file that changed
119#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
120pub enum FileType {
121    /// Regular file
122    File,
123    /// Directory
124    Directory,
125    /// Symbolic link
126    Symlink,
127}
128
129/// Represents the result of an rsync operation with details about what was transferred
130#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Named)]
131pub struct RsyncResult {
132    /// All changes that occurred during the rsync operation
133    pub changes: Vec<Change>,
134}
135wirevalue::register_type!(RsyncResult);
136
137impl RsyncResult {
138    /// Create an empty rsync result
139    pub fn empty() -> Self {
140        Self {
141            changes: Vec::new(),
142        }
143    }
144
145    /// Parse rsync output to extract file transfer information
146    /// Since create_dir_all ensures the workspace exists, we can assume all stdout lines are changes
147    fn parse_from_output(stdout: &str) -> Result<Self> {
148        let mut changes = Vec::new();
149
150        // Parse stdout for file operations (when using --itemize-changes)
151        // All lines in stdout represent changes since the workspace directory exists
152
153        // rsync itemize format: YXcstpoguax path
154        // Y = update type (>, c, h, ., etc.)
155        // X = file type (f=file, d=directory, L=symlink, etc.)
156        for line in stdout.lines() {
157            let line = line.trim();
158            let (raw_changes, path) = line.split_at(11);
159            let raw_changes = raw_changes.trim();
160            let path = &path[1..]; // remove leading space
161
162            let mut iter = raw_changes.chars();
163            let change_type = match iter.next().context("missing change type")? {
164                '*' => ChangeType::Message(match iter.as_str() {
165                    "deleting" => ChangeMessage::Deleting,
166                    _ => bail!("unexpected change message: {}", raw_changes),
167                }),
168                c => {
169                    let atype = match c {
170                        '.' => ChangeAction::NotTransferred,
171                        '>' => ChangeAction::Received,
172                        'c' => ChangeAction::LocalChange,
173                        _ => bail!("unexpected change type: {}", raw_changes),
174                    };
175                    let file_type = match iter.next().context("missing file type")? {
176                        'f' => FileType::File,
177                        'd' => FileType::Directory,
178                        'L' => FileType::Symlink,
179                        _ => bail!("unexpected file type: {}", raw_changes),
180                    };
181                    ChangeType::Action(atype, file_type)
182                }
183            };
184
185            changes.push(Change {
186                change_type,
187                path: PathBuf::from(path),
188            });
189        }
190
191        Ok(Self { changes })
192    }
193}
194
195pub async fn do_rsync(addr: &SocketAddr, workspace: &Path) -> Result<RsyncResult> {
196    // Make sure the target workspace exists, mainly to avoid the "created director ..."
197    // line in rsync output.
198    fs::create_dir_all(workspace).await?;
199
200    let rsync_bin_path = get_rsync_bin_path().await?;
201    let output = Command::new(rsync_bin_path)
202        .arg("--archive")
203        .arg("--delete")
204        // Show detailed changes for each file
205        .arg("--itemize-changes")
206        // By setting these flags, we make `rsync` immune to multiple invocations
207        // targeting the same dir, which can happen if we don't take care to only
208        // allow one worker on a given host to do the `rsync`.
209        .arg("--delete-after")
210        .arg("--delay-updates")
211        .arg("--exclude=.rsync-tmp.*")
212        .arg(format!("--partial-dir=.rsync-tmp.{}", addr.port()))
213        .arg(format!("rsync://{}/workspace", addr))
214        .arg(format!("{}/", workspace.display()))
215        .stdout(Stdio::piped())
216        .stderr(Stdio::piped())
217        .output()
218        .await
219        .map_err(|e| {
220            if e.kind() == std::io::ErrorKind::NotFound {
221                anyhow::anyhow!(
222                    "rsync binary: '{}' does not exist. Please ensure 'rsync' is installed and available in PATH.",
223                    rsync_bin_path.display()
224                )
225            } else {
226                anyhow::anyhow!("failed to execute rsync: {}", e)
227            }
228        })?;
229
230    output
231        .status
232        .exit_ok()
233        .with_context(|| format!("rsync failed: {}", String::from_utf8_lossy(&output.stderr)))?;
234
235    RsyncResult::parse_from_output(&String::from_utf8(output.stdout)?)
236}
237
238#[derive(Debug)]
239pub struct RsyncDaemon {
240    child: Child,
241    state: TempDir,
242    addr: SocketAddr,
243}
244
245impl RsyncDaemon {
246    pub async fn spawn(listener: TcpListener, workspace: &Path) -> Result<Self> {
247        let state = TempDir::with_prefix("rsyncd.")?;
248
249        // Write rsync config file
250        // TODO(agallagher): We can setup a secrets file to provide some measure of
251        // security and prevent stray rsync calls from hitting the server.
252        let content = format!(
253            r#"\
254[workspace]
255    path = {workspace}
256    use chroot = no
257    list = no
258    read only = true
259    write only = false
260    uid = {uid}
261    hosts allow = localhost
262"#,
263            workspace = workspace.display(),
264            uid = nix::unistd::getuid().as_raw(),
265        );
266        let config = state.path().join("rsync.config");
267        fs::write(&config, content).await?;
268
269        // Find free port.  This is potentially racy, as some process could
270        // potentially bind to this port in between now and when `rsync` starts up
271        // below.  But I'm not sure a better way to do this, as rsync doesn't appear
272        // to support `rsync --sockopts=SO_PORTREUSE` (to share this port we've
273        // reserved) or `--port=0` (to pick a free port -- it'll just always use
274        // 873).
275        let addr = listener.local_addr()?;
276        std::mem::drop(listener);
277
278        // Spawn the rsync daemon.
279        let mut child = Command::new(get_rsync_bin_path().await?)
280            .arg("--daemon")
281            .arg("--no-detach")
282            .arg(format!("--address={}", addr.ip()))
283            .arg(format!("--port={}", addr.port()))
284            .arg(format!("--config={}", config.display()))
285            .arg(format!("--log-file={}/log", state.path().display()))
286            .kill_on_drop(true)
287            .spawn()?;
288
289        // Wait until the rsync daemon is ready to connect via polling it (I tried polling
290        // the log file to wait for the "listening" log line, but that gets prevented *before*
291        // it actually starts the listening loop).
292        tokio::select! {
293            res = child.wait() => bail!("unexpected early exit: {:?}", res),
294            res = async {
295                loop {
296                    match TcpStream::connect(addr).await {
297                        Err(err) if err.kind() == ErrorKind::ConnectionRefused => {
298                            tokio::time::sleep(Duration::from_millis(1)).await
299                        }
300                        Err(err) => return Err(err.into()),
301                        Ok(_) => break,
302                    }
303                }
304                anyhow::Ok(())
305            } => res?,
306        }
307
308        Ok(Self { child, state, addr })
309    }
310
311    pub fn addr(&self) -> &SocketAddr {
312        &self.addr
313    }
314
315    pub async fn shutdown(mut self) -> Result<String> {
316        let logs = fs::read_to_string(self.state.path().join("log")).await;
317        let id = self.child.id().context("missing pid")?;
318        let pid = Pid::from_raw(id as i32);
319        signal::kill(pid, Signal::SIGINT)?;
320        let status = self.child.wait().await?;
321        // rsync exists with 20 when sent SIGINT.
322        ensure!(status.code() == Some(20));
323        Ok(logs?)
324    }
325}
326
327#[derive(Debug, Clone, Named, Serialize, Deserialize, Bind, Unbind)]
328pub struct RsyncMessage {
329    /// The connect message to create a duplex bytestream with the client.
330    pub connect: reference::PortRef<Connect>,
331    /// A port to send back the rsync result or any errors.
332    pub result: reference::PortRef<Result<RsyncResult, String>>,
333    /// The location of the workspace to sync.
334    pub workspace: WorkspaceLocation,
335}
336wirevalue::register_type!(RsyncMessage);
337
338#[derive(Debug, Default)]
339#[hyperactor::export(spawn = true, handlers = [RsyncMessage { cast = true }])]
340pub struct RsyncActor {
341    //workspace: WorkspaceLocation,
342}
343
344impl Actor for RsyncActor {}
345
346#[async_trait]
347impl Handler<RsyncMessage> for RsyncActor {
348    async fn handle(
349        &mut self,
350        cx: &hyperactor::Context<Self>,
351        RsyncMessage {
352            workspace,
353            connect,
354            result,
355        }: RsyncMessage,
356    ) -> Result<(), anyhow::Error> {
357        let res = async {
358            let workspace = workspace
359                .resolve()
360                .context("resolving workspace location")?;
361            let (connect_msg, completer) = Connect::allocate(cx.self_id().clone(), cx);
362            connect.send(cx, connect_msg)?;
363
364            // some machines (e.g. github CI) do not have ipv6, so try ipv6 then fallback to ipv4
365            let ipv6_lo: SocketAddr = "[::1]:0".parse()?;
366            let ipv4_lo: SocketAddr = "127.0.0.1:0".parse()?;
367            let addrs: [SocketAddr; 2] = [ipv6_lo, ipv4_lo];
368
369            let (listener, mut stream) = try_join!(
370                TcpListener::bind(&addrs[..]).err_into(),
371                completer.complete(),
372            )?;
373            let addr = listener.local_addr()?;
374            let (rsync_result, _) = try_join!(do_rsync(&addr, &workspace), async move {
375                let (mut local, _) = listener.accept().await?;
376                tokio::io::copy_bidirectional(&mut stream, &mut local).await?;
377                anyhow::Ok(())
378            },)?;
379            anyhow::Ok(rsync_result)
380        }
381        .await;
382        result.send(cx, res.map_err(|e| format!("{:#?}", e)))?;
383        Ok(())
384    }
385}
386
387pub async fn rsync_mesh<C: context::Actor + Copy + Unpin>(
388    cx: C,
389    actor_mesh: &ActorMesh<RsyncActor>,
390    local_workspace: PathBuf,
391    remote_workspace: WorkspaceLocation,
392) -> Result<Vec<RsyncResult>> {
393    use ndslice::View;
394
395    // Spawn a rsync daemon to accept incoming connections from actors.
396    let daemon = RsyncDaemon::spawn(TcpListener::bind(("::1", 0)).await?, &local_workspace).await?;
397    let daemon_addr = daemon.addr();
398
399    let (rsync_conns_tx, rsync_conns_rx) = cx.mailbox().open_port::<Connect>();
400    let num_actors = actor_mesh.region().num_ranks();
401
402    let res = try_join!(
403        rsync_conns_rx
404            .take(num_actors)
405            .err_into::<anyhow::Error>()
406            .try_for_each_concurrent(None, |connect| async move {
407                let (mut local, mut stream) = try_join!(
408                    TcpStream::connect(daemon_addr.clone()).err_into(),
409                    accept(cx, cx.instance().self_id().clone(), connect),
410                )?;
411                tokio::io::copy_bidirectional(&mut local, &mut stream).await?;
412                anyhow::Ok(())
413            })
414            .boxed(),
415        async move {
416            let (result_tx, result_rx) = cx.mailbox().open_port::<Result<RsyncResult, String>>();
417            actor_mesh.cast(
418                &cx,
419                RsyncMessage {
420                    connect: rsync_conns_tx.bind(),
421                    result: result_tx.bind(),
422                    workspace: remote_workspace,
423                },
424            )?;
425            let res: Vec<RsyncResult> = result_rx
426                .take(num_actors)
427                .map(|res| res?.map_err(anyhow::Error::msg))
428                .try_collect()
429                .await?;
430            anyhow::Ok(res)
431        },
432    );
433
434    // Kill rsync server and attempt to grab the logs.
435    let logs = daemon.shutdown().await;
436
437    // Return results, attaching rsync daemon logs on error.
438    match res {
439        Ok(((), results)) => {
440            let _ = logs?;
441            Ok(results)
442        }
443        Err(err) => match logs {
444            Ok(logs) => Err(err).with_context(|| format!("rsync server logs: {}", logs)),
445            Err(shutdown_err) => {
446                warn!("failed to read logs from rsync daemon: {:?}", shutdown_err);
447                Err(err)
448            }
449        },
450    }
451}
452
453#[cfg(test)]
454mod tests {
455    use anyhow::Result;
456    use anyhow::anyhow;
457    use hyperactor_mesh::ActorMesh;
458    use hyperactor_mesh::context;
459    use hyperactor_mesh::test_utils;
460    use tempfile::TempDir;
461    use tokio::fs;
462    use tokio::net::TcpListener;
463
464    use super::*;
465
466    #[tokio::test]
467    // TODO: OSS: Cannot assign requested address (os error 99)
468    #[cfg_attr(not(fbcode_build), ignore)]
469    async fn test_simple() -> Result<()> {
470        let input = TempDir::new()?;
471        fs::write(input.path().join("foo.txt"), "hello world").await?;
472
473        let output = TempDir::new()?;
474
475        let server = TcpListener::bind(("::", 0)).await?;
476        let daemon = RsyncDaemon::spawn(server, output.path()).await?;
477        do_rsync(daemon.addr(), input.path()).await?;
478        daemon.shutdown().await?;
479
480        assert!(!dir_diff::is_different(&input, &output).map_err(|e| anyhow!("{:?}", e))?);
481
482        Ok(())
483    }
484
485    #[tokio::test]
486    // TODO: OSS: Cannot assign requested address (os error 99)
487    #[cfg_attr(not(fbcode_build), ignore)]
488    async fn test_rsync_actor_and_mesh() -> Result<()> {
489        // Create source workspace with test files
490        let source_workspace = TempDir::new()?;
491        fs::write(source_workspace.path().join("test1.txt"), "content1").await?;
492        fs::write(source_workspace.path().join("test2.txt"), "content2").await?;
493        fs::create_dir(source_workspace.path().join("subdir")).await?;
494        fs::write(source_workspace.path().join("subdir/test3.txt"), "content3").await?;
495
496        // Create target workspace for the actors
497        let target_workspace = TempDir::new()?;
498        fs::create_dir(target_workspace.path().join("subdir5")).await?;
499        fs::write(target_workspace.path().join("foo.txt"), "something").await?;
500
501        // Set up actor mesh with 2 RsyncActors
502        let cx = context().await;
503        let instance = cx.actor_instance;
504        let mut host_mesh = test_utils::local_host_mesh(1).await;
505        let proc_mesh = host_mesh
506            .spawn(instance, "rsync_test", ndslice::Extent::unity())
507            .await
508            .unwrap();
509        // Spawn actor mesh with RsyncActors
510        let actor_mesh: ActorMesh<RsyncActor> =
511            proc_mesh.spawn(instance, "rsync_test", &()).await?;
512
513        // Test rsync_mesh function - this coordinates rsync operations across the mesh
514        let results = rsync_mesh(
515            instance,
516            &actor_mesh,
517            source_workspace.path().to_path_buf(),
518            WorkspaceLocation::Constant(target_workspace.path().to_path_buf()),
519        )
520        .await?;
521
522        // Verify we got results back
523        assert_eq!(results.len(), 1); // We have 1 actor in the mesh
524
525        let rsync_result = &results[0];
526
527        // Verify that files were transferred (should be at least the files we created)
528        // Note: The exact files detected may vary based on rsync's itemization,
529        // but we should have some indication of transfer activity
530        println!("Rsync result: {:#?}", rsync_result);
531
532        // Verify we copied correctly.
533        assert!(
534            !dir_diff::is_different(&source_workspace, &target_workspace)
535                .map_err(|e| anyhow!("{:?}", e))?
536        );
537
538        let _ = host_mesh.shutdown(instance).await;
539        Ok(())
540    }
541
542    #[tokio::test]
543    async fn test_rsync_result_parsing() -> Result<()> {
544        // Test the parsing logic with mock rsync output
545        let stdout = r#">f+++++++++ test1.txt
546>f+++++++++ test2.txt
547cd+++++++++ subdir/
548>f+++++++++ subdir/test3.txt
549*deleting   old_file.txt
550"#;
551
552        let result = RsyncResult::parse_from_output(stdout)?;
553
554        // Define the expected changes
555        let expected_changes = vec![
556            Change {
557                change_type: ChangeType::Action(ChangeAction::Received, FileType::File),
558                path: PathBuf::from("test1.txt"),
559            },
560            Change {
561                change_type: ChangeType::Action(ChangeAction::Received, FileType::File),
562                path: PathBuf::from("test2.txt"),
563            },
564            Change {
565                change_type: ChangeType::Action(ChangeAction::LocalChange, FileType::Directory),
566                path: PathBuf::from("subdir/"),
567            },
568            Change {
569                change_type: ChangeType::Action(ChangeAction::Received, FileType::File),
570                path: PathBuf::from("subdir/test3.txt"),
571            },
572            Change {
573                change_type: ChangeType::Message(ChangeMessage::Deleting),
574                path: PathBuf::from("old_file.txt"),
575            },
576        ];
577
578        // Verify we have all expected changes
579        assert_eq!(result.changes.len(), expected_changes.len());
580
581        // Compare each change
582        for (actual, expected) in result.changes.iter().zip(expected_changes.iter()) {
583            assert_eq!(
584                actual, expected,
585                "Change mismatch: actual={:?}, expected={:?}",
586                actual, expected
587            );
588        }
589
590        // Verify the entire result matches
591        assert_eq!(result.changes, expected_changes);
592
593        Ok(())
594    }
595}