monarch_conda/
sync.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::cmp::Ordering;
10use std::collections::BTreeMap;
11use std::collections::BTreeSet;
12use std::collections::HashMap;
13use std::io::ErrorKind;
14use std::os::unix::fs::MetadataExt;
15use std::os::unix::fs::PermissionsExt;
16use std::path::Path;
17use std::path::PathBuf;
18use std::sync::Arc;
19use std::sync::mpsc::channel;
20use std::time::Duration;
21use std::time::SystemTime;
22use std::time::UNIX_EPOCH;
23
24use anyhow::Context;
25use anyhow::Result;
26use anyhow::bail;
27use anyhow::ensure;
28use async_tempfile::TempFile;
29use dashmap::DashMap;
30use dashmap::mapref::entry::Entry;
31use filetime::FileTime;
32use futures::SinkExt;
33use futures::StreamExt;
34use futures::try_join;
35use globset::Glob;
36use globset::GlobSet;
37use globset::GlobSetBuilder;
38use ignore::DirEntry;
39use ignore::WalkBuilder;
40use ignore::WalkState;
41use itertools::Itertools;
42use memmap2::MmapMut;
43use serde::Deserialize;
44use serde::Serialize;
45use tokio::fs;
46use tokio::io::AsyncRead;
47use tokio::io::AsyncReadExt;
48use tokio::io::AsyncWrite;
49use tokio::io::AsyncWriteExt;
50use tokio_util::codec::FramedRead;
51use tokio_util::codec::FramedWrite;
52use tokio_util::codec::LengthDelimitedCodec;
53
54use crate::diff::CondaFingerprint;
55use crate::replace::ReplacerBuilder;
56
57#[derive(Eq, PartialEq)]
58enum Origin {
59    Src,
60    Dst,
61}
62
63#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)]
64enum FileTypeInfo {
65    Directory,
66    File(bool),
67    Symlink,
68}
69
70impl FileTypeInfo {
71    fn same(&self, other: &FileTypeInfo) -> bool {
72        match (self, other) {
73            (FileTypeInfo::Directory, FileTypeInfo::Directory) => true,
74            (FileTypeInfo::File(_), FileTypeInfo::File(_)) => true,
75            (FileTypeInfo::Symlink, FileTypeInfo::Symlink) => true,
76            _ => false,
77        }
78    }
79}
80
81#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)]
82struct Metadata {
83    mtime: SystemTime,
84    ftype: FileTypeInfo,
85}
86
87#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
88pub enum Receive {
89    File { executable: bool },
90    Symlink,
91}
92
93#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
94pub enum Action {
95    /// Delete a path.
96    Delete { directory: bool },
97    /// Create a directory.
98    Directory,
99    /// Receive the path contents from the sender.
100    Receive(SystemTime, Receive),
101}
102
103#[derive(Debug, Serialize, Deserialize)]
104struct FileSectionHeader {
105    num: usize,
106}
107
108#[derive(Debug, Serialize, Deserialize)]
109struct FileHeader {
110    path: PathBuf,
111    symlink: bool,
112}
113
114#[derive(Debug, Serialize, Deserialize)]
115enum FileContents {
116    Symlink(PathBuf),
117    File(u64),
118}
119
120#[derive(Debug, Serialize, Deserialize)]
121struct FileContentsHeader {
122    path: PathBuf,
123    contents: FileContents,
124}
125
126#[derive(Debug, Serialize, Deserialize)]
127enum FileList {
128    Entry(PathBuf, Metadata),
129    Done,
130}
131
132struct ActionsBuilder {
133    ignores: Option<GlobSet>,
134    state: DashMap<PathBuf, (Origin, Metadata)>,
135    actions: DashMap<PathBuf, Action>,
136    mtime_comparator: Box<dyn Fn(&SystemTime, &SystemTime) -> Ordering + Send + Sync + 'static>,
137}
138
139impl ActionsBuilder {
140    fn new_with(
141        ignores: Option<GlobSet>,
142        mtime_comparator: Box<dyn Fn(&SystemTime, &SystemTime) -> Ordering + Send + Sync + 'static>,
143    ) -> Self {
144        Self {
145            ignores,
146            state: DashMap::new(),
147            actions: DashMap::new(),
148            mtime_comparator,
149        }
150    }
151
152    fn process(&self, origin: Origin, path: PathBuf, metadata: Metadata) -> Result<()> {
153        match self.state.entry(path) {
154            Entry::Occupied(val) => {
155                let (path, (existing_origin, existing_metadata)) = val.remove_entry();
156                if let Some(ignores) = &self.ignores {
157                    if ignores.is_match(path.as_path()) {
158                        return Ok(());
159                    }
160                }
161                ensure!(existing_origin != origin);
162                let (src, dst) = match origin {
163                    Origin::Dst => (existing_metadata, metadata),
164                    Origin::Src => (metadata, existing_metadata),
165                };
166                if src.ftype == FileTypeInfo::Directory && dst.ftype == FileTypeInfo::Directory {
167                    // --omit-dir-times
168                } else {
169                    match (self.mtime_comparator)(&src.mtime, &dst.mtime) {
170                        Ordering::Equal => {
171                            ensure!(
172                                src.ftype.same(&dst.ftype),
173                                "{}: {:?} != {:?}",
174                                path.display(),
175                                dst,
176                                src
177                            );
178                        }
179                        Ordering::Greater | Ordering::Less => {
180                            self.actions.insert(
181                                path,
182                                match src.ftype {
183                                    FileTypeInfo::File(executable) => {
184                                        Action::Receive(src.mtime, Receive::File { executable })
185                                    }
186                                    FileTypeInfo::Symlink => {
187                                        Action::Receive(src.mtime, Receive::Symlink)
188                                    }
189                                    FileTypeInfo::Directory => Action::Directory,
190                                },
191                            );
192                        }
193                    }
194                }
195            }
196            Entry::Vacant(entry) => {
197                entry.insert((origin, metadata));
198            }
199        }
200        Ok(())
201    }
202
203    fn process_src(&self, path: PathBuf, metadata: Metadata) -> Result<()> {
204        self.process(Origin::Src, path, metadata)
205    }
206
207    fn process_dst(&self, path: PathBuf, metadata: Metadata) -> Result<()> {
208        self.process(Origin::Dst, path, metadata)
209    }
210
211    fn into_actions(self) -> HashMap<PathBuf, Action> {
212        let mut actions: HashMap<_, _> = self.actions.into_iter().collect();
213        for (path, (origin, metadata)) in self.state.into_iter() {
214            match origin {
215                Origin::Src => {
216                    if let Some(ignores) = &self.ignores {
217                        if ignores.is_match(path.as_path()) {
218                            continue;
219                        }
220                    }
221                    actions.insert(
222                        path,
223                        match metadata.ftype {
224                            FileTypeInfo::File(executable) => {
225                                Action::Receive(metadata.mtime, Receive::File { executable })
226                            }
227                            FileTypeInfo::Directory => Action::Directory,
228                            FileTypeInfo::Symlink => {
229                                Action::Receive(metadata.mtime, Receive::Symlink)
230                            }
231                        },
232                    );
233                }
234                Origin::Dst => {
235                    actions.insert(
236                        path,
237                        Action::Delete {
238                            directory: matches!(metadata.ftype, FileTypeInfo::Directory),
239                        },
240                    );
241                }
242            }
243        }
244        actions
245    }
246}
247
248fn walk_dir<
249    E: Into<anyhow::Error>,
250    F: Fn(PathBuf, Metadata) -> Result<(), E> + Sync + Send + 'static,
251>(
252    src: PathBuf,
253    callback: F,
254) -> Result<()> {
255    let (error_tx, error_rx) = channel();
256
257    let src_handle = src.clone();
258    let handle_ent = move |entry: DirEntry| -> Result<()> {
259        let metadata = entry.metadata()?;
260        callback(
261            entry
262                .path()
263                .strip_prefix(src_handle.clone())
264                .context("sub path")?
265                .to_path_buf(),
266            Metadata {
267                mtime: UNIX_EPOCH
268                    + Duration::new(
269                        metadata.mtime().try_into()?,
270                        metadata.mtime_nsec().try_into()?,
271                    ),
272                ftype: if metadata.file_type().is_file() {
273                    let mode = metadata.permissions().mode();
274                    FileTypeInfo::File(mode & 0o100 != 0)
275                } else if metadata.file_type().is_dir() {
276                    FileTypeInfo::Directory
277                } else if metadata.file_type().is_symlink() {
278                    FileTypeInfo::Symlink
279                } else {
280                    bail!("unexpected file type")
281                },
282            },
283        )
284        .map_err(Into::into)?;
285        Ok(())
286    };
287
288    WalkBuilder::new(src)
289        .standard_filters(true)
290        .same_file_system(true)
291        .build_parallel()
292        .run(|| {
293            Box::new(|ent| match ent.map_err(Into::into).and_then(&handle_ent) {
294                Ok(()) => WalkState::Continue,
295                Err(err) => {
296                    error_tx.clone().send(err).unwrap();
297                    WalkState::Quit
298                }
299            })
300        });
301
302    match error_rx.try_recv() {
303        Ok(err) => Err(err),
304        _ => Ok(()),
305    }
306}
307
308pub async fn sender(
309    src: &Path,
310    from_receiver: impl AsyncRead + Unpin,
311    to_receiver: impl AsyncWrite + Unpin,
312) -> Result<()> {
313    let mut to_receiver = FramedWrite::new(to_receiver, LengthDelimitedCodec::new());
314    let mut from_receiver = FramedRead::new(from_receiver, LengthDelimitedCodec::new());
315
316    let (ent_tx, mut ent_rx) = tokio::sync::mpsc::unbounded_channel();
317    let src_clone = src.to_path_buf();
318    try_join!(
319        async {
320            tokio::task::spawn_blocking(move || {
321                walk_dir(src_clone.clone(), move |path, ent| ent_tx.send((path, ent)))
322            })
323            .await?
324        },
325        async {
326            // Send conda env fingerprint
327            let src_env = CondaFingerprint::from_env(src).await?;
328            to_receiver
329                .send(bincode::serialize(&src_env)?.into())
330                .await
331                .context("sending src conda fingerprint")?;
332            to_receiver.flush().await?;
333
334            // Send file lists.
335            while let Some((path, metadata)) = ent_rx.recv().await {
336                to_receiver
337                    .send(bincode::serialize(&FileList::Entry(path, metadata))?.into())
338                    .await
339                    .context("sending file ent")?;
340            }
341            to_receiver
342                .send(bincode::serialize(&FileList::Done)?.into())
343                .await
344                .context("sending file list end")?;
345            to_receiver.flush().await?;
346
347            anyhow::Ok(())
348        },
349    )?;
350
351    // Convert back to raw stream to send file header + contents.
352    to_receiver.flush().await?;
353    let mut to_receiver = to_receiver.into_inner();
354
355    let hdr: FileSectionHeader =
356        bincode::deserialize(&from_receiver.next().await.context("header")??)?;
357    for _ in 0..hdr.num {
358        let FileHeader { path, symlink } =
359            bincode::deserialize(&from_receiver.next().await.context("signature")??)?;
360        let fpath = src.join(&path);
361        if symlink {
362            let header = FileContentsHeader {
363                path,
364                contents: FileContents::Symlink(fs::read_link(&fpath).await?),
365            };
366            let header = bincode::serialize(&header)?;
367            to_receiver.write_all(&header.len().to_le_bytes()).await?;
368            to_receiver
369                .write_all(&header)
370                .await
371                .context("sending sig header")?;
372        } else {
373            let mut base = fs::File::open(src.join(&path)).await?;
374            let header = FileContentsHeader {
375                path,
376                contents: FileContents::File(base.metadata().await?.len()),
377            };
378            let header = bincode::serialize(&header)?;
379            to_receiver.write_all(&header.len().to_le_bytes()).await?;
380            to_receiver
381                .write_all(&header)
382                .await
383                .context("sending sig header")?;
384            tokio::io::copy(&mut base, &mut to_receiver).await?;
385        }
386    }
387    to_receiver.flush().await?;
388
389    Ok(())
390}
391
392async fn persist(tmp: TempFile, path: &Path) -> Result<(), std::io::Error> {
393    // Atomic rename the temp file into its final location.
394    match fs::rename(tmp.file_path(), &path).await {
395        Err(err) if err.kind() == ErrorKind::IsADirectory => {
396            async {
397                fs::remove_dir(&path).await?;
398                fs::rename(tmp.file_path(), &path).await
399            }
400            .await
401        }
402        other => other,
403    }?;
404    tmp.drop_async().await;
405    Ok(())
406}
407
408/// Helper function to set the FileTime for every file, symlink, and directory in a directory tree
409async fn set_mtime(path: &Path, mtime: SystemTime) -> Result<(), std::io::Error> {
410    let mtime = FileTime::from_system_time(mtime);
411    filetime::set_symlink_file_times(path, mtime.clone(), mtime)?;
412    Ok(())
413}
414
415async fn make_executable(path: &Path) -> Result<(), std::io::Error> {
416    let metadata = fs::metadata(path).await?;
417    let mut permissions = metadata.permissions();
418    let mode = permissions.mode();
419    permissions.set_mode(mode | 0o111);
420    fs::set_permissions(path, permissions).await?;
421    Ok(())
422}
423
424fn is_binary(buf: &[u8]) -> bool {
425    // If any null byte is seen, treat as binary
426    if buf.iter().contains(&0) {
427        return true;
428    }
429
430    // Count non-printable characters (excluding common control chars)
431    let non_print = buf
432        .iter()
433        .filter(|&&b| !(b == b'\n' || b == b'\r' || b == b'\t' || (0x20..=0x7E).contains(&b)))
434        .count();
435
436    // If more than 30%, consider binary
437    non_print * 100 > buf.len() * 30
438}
439
440pub async fn receiver(
441    dst: &Path,
442    from_sender: impl AsyncRead + Unpin,
443    to_sender: impl AsyncWrite + Unpin,
444    replacement_paths: HashMap<PathBuf, PathBuf>,
445) -> Result<HashMap<PathBuf, Action>> {
446    let mut to_sender = FramedWrite::new(to_sender, LengthDelimitedCodec::new());
447    let mut from_sender = FramedRead::new(from_sender, LengthDelimitedCodec::new());
448
449    // Get the conda env fingerprint for the src and dst, and use that to create a
450    // comparator we can use to compare the mtimes between them.
451    let dst_env = CondaFingerprint::from_env(dst).await?;
452    let src_env: CondaFingerprint =
453        bincode::deserialize(&from_sender.next().await.context("fingerprint")??)?;
454    let comparator = CondaFingerprint::mtime_comparator(&src_env, &dst_env)?;
455    let ignores = GlobSetBuilder::new()
456        .add(Glob::new("**/*.pyc")?)
457        .add(Glob::new("**/__pycache__/")?)
458        .add(Glob::new("**/__pycache__/**/*")?)
459        .build()?;
460    let actions_builder = Arc::new(ActionsBuilder::new_with(Some(ignores), comparator));
461
462    // Process file lists from src/dst.
463    try_join!(
464        // Walk destination to grab file list.
465        async {
466            let dst = dst.to_path_buf();
467            let actions_builder = actions_builder.clone();
468            tokio::task::spawn_blocking(move || {
469                walk_dir(dst, move |path, ent| {
470                    actions_builder
471                        .process_dst(path.clone(), ent)
472                        .with_context(|| format!("{}", path.display()))
473                })
474            })
475            .await??;
476            anyhow::Ok(())
477        },
478        // Process file list sent from sender.
479        async {
480            while let FileList::Entry(path, metadata) =
481                bincode::deserialize(&from_sender.next().await.context("file list")??)?
482            {
483                actions_builder
484                    .process_src(path.clone(), metadata)
485                    .with_context(|| format!("{}", path.display()))?;
486            }
487            anyhow::Ok(())
488        }
489    )?;
490    let actions = Arc::into_inner(actions_builder)
491        .expect("should be done")
492        .into_actions();
493
494    // Demultiplex FS actions.
495    let mut dirs = BTreeSet::new();
496    let mut deletions = BTreeMap::new();
497    let mut files = HashMap::new();
498    for (path, action) in actions.iter() {
499        let path = path.clone();
500        match action {
501            Action::Directory => {
502                dirs.insert(path);
503            }
504            Action::Receive(mtime, recv) => {
505                files.insert(path, (*mtime, recv));
506            }
507            Action::Delete { directory } => {
508                deletions.insert(path, *directory);
509            }
510        }
511    }
512
513    try_join!(
514        async {
515            // Process deletions first.
516            for (path, is_dir) in deletions.into_iter().rev() {
517                let fpath = dst.join(path);
518                if is_dir {
519                    fs::remove_dir(&fpath).await
520                } else {
521                    fs::remove_file(&fpath).await
522                }
523                .with_context(|| format!("deleting {}", fpath.display()))?;
524            }
525
526            // Then create dirs.
527            for path in dirs.into_iter() {
528                let fpath = dst.join(path);
529                match fs::remove_file(&fpath).await {
530                    Err(err) if err.kind() == ErrorKind::NotFound => Ok(()),
531                    other => other,
532                }
533                .with_context(|| format!("clearing path {}", fpath.display()))?;
534                fs::create_dir(&fpath)
535                    .await
536                    .with_context(|| format!("creating dir {}", fpath.display()))?;
537            }
538
539            // Build a prefix path replacer.
540            let replacer = {
541                let mut builder = ReplacerBuilder::new();
542
543                // Add the conda src/dst prefixes.
544                let src_prefix = src_env.pack_meta.history.last_prefix()?;
545                let dst_prefix = dst_env.pack_meta.history.last_prefix()?;
546                if src_prefix != dst_prefix {
547                    builder.add(src_prefix, dst_prefix)?;
548                }
549
550                // Add custom replacements passed in.
551                for (src, dst) in replacement_paths.iter() {
552                    if src != dst {
553                        builder.add(src, dst)?;
554                    }
555                }
556
557                builder.build_if_non_empty()?
558            };
559
560            // Then pull file data and create files.
561            let mut from_sender = from_sender.into_inner();
562            for _ in 0..files.len() {
563                // Read a file header.
564                let len = from_sender.read_u64_le().await?;
565                let mut buf = vec![0u8; len as usize];
566                from_sender.read_exact(&mut buf).await?;
567                let FileContentsHeader { path, contents } =
568                    bincode::deserialize(&buf).context("delta header")?;
569                let fpath = dst.join(&path);
570                match (contents, files.get(&path).context("missing file")?) {
571                    // Read file contents and write to a tempfile.
572                    (FileContents::File(len), (mtime, Receive::File { executable })) => {
573                        let mut dst_tmp =
574                            TempFile::new_in(fpath.parent().context("parent")?).await?;
575                        let mut reader = (&mut from_sender).take(len);
576
577                        // Copy the file contents.
578                        if let Some(ref replacer) = replacer {
579                            // We do different copies dependending on whether the file is binary or not.
580                            let mut buf = vec![0; 4096];
581                            let len = reader.read(&mut buf[..]).await?;
582                            buf.truncate(len);
583                            if is_binary(&buf) {
584                                dst_tmp.write_all(&buf).await?;
585                                tokio::io::copy(&mut reader, &mut dst_tmp).await?;
586
587                                // For binary files, replace prefixes.
588                                // SAFETY: use mmap for fast in-place prefix replacement
589                                let mut mmap = unsafe { MmapMut::map_mut(&*dst_tmp)? };
590                                replacer.replace_inplace_padded(&mut mmap)?;
591                            } else {
592                                reader.read_to_end(&mut buf).await?;
593                                replacer.replace_inplace(&mut buf);
594                                dst_tmp.write_all(&buf).await?;
595                            }
596                        } else {
597                            tokio::io::copy(&mut reader, &mut dst_tmp).await?;
598                        }
599
600                        if *executable {
601                            make_executable(dst_tmp.file_path()).await?;
602                        }
603                        persist(dst_tmp, &fpath).await?;
604                        set_mtime(&fpath, *mtime).await?;
605                    }
606                    (FileContents::Symlink(mut target), (mtime, Receive::Symlink)) => {
607                        if let Some(ref replacer) = replacer {
608                            target = replacer.replace_path(target);
609                        }
610                        fs::symlink(target, &fpath).await?;
611                        set_mtime(&fpath, *mtime).await?;
612                    }
613                    _ => bail!("unexpected file contents"),
614                }
615            }
616            anyhow::Ok(())
617        },
618        async {
619            to_sender
620                .send(bincode::serialize(&FileSectionHeader { num: files.len() })?.into())
621                .await
622                .context("sending sig section header")?;
623            for (path, (_, recv)) in files.iter() {
624                to_sender
625                    .send(
626                        bincode::serialize(&FileHeader {
627                            path: path.clone(),
628                            symlink: matches!(recv, Receive::Symlink),
629                        })?
630                        .into(),
631                    )
632                    .await
633                    .context("sending sig header")?;
634            }
635            to_sender.flush().await?;
636            anyhow::Ok(())
637        },
638    )?;
639
640    Ok(actions)
641}
642
643pub async fn sync(src: &Path, dst: &Path) -> Result<HashMap<PathBuf, Action>> {
644    // Receiver -> Sender
645    let (recv, send) = tokio::io::duplex(5 * 1024 * 1024);
646    let (from_receiver, to_receiver) = tokio::io::split(recv);
647    let (from_sender, to_sender) = tokio::io::split(send);
648    let (actions, ()) = try_join!(
649        receiver(dst, from_sender, to_sender, HashMap::new()),
650        sender(src, from_receiver, to_receiver),
651    )?;
652    Ok(actions)
653}
654
655#[cfg(test)]
656mod tests {
657    use std::collections::HashMap;
658    use std::os::unix::fs::PermissionsExt;
659    use std::path::Path;
660    use std::path::PathBuf;
661    use std::time::Duration;
662    use std::time::SystemTime;
663
664    use anyhow::Result;
665    use rattler_conda_types::package::FileMode;
666    use tempfile::TempDir;
667    use tokio::fs;
668
669    use super::Action;
670    use super::make_executable;
671    use super::set_mtime;
672    use super::sync;
673    use crate::pack_meta_history::History;
674    use crate::pack_meta_history::HistoryRecord;
675    use crate::pack_meta_history::Offset;
676    use crate::pack_meta_history::OffsetRecord;
677    use crate::pack_meta_history::Offsets;
678    use crate::sync::Receive;
679
680    /// Helper function to create a basic conda environment structure
681    async fn setup_conda_env<P: AsRef<Path>>(
682        dirpath: P,
683        mtime: SystemTime,
684        prefix: Option<&str>,
685    ) -> Result<P> {
686        let env_path = dirpath.as_ref();
687
688        // Create the basic directory structure
689        fs::create_dir_all(&env_path).await?;
690        fs::create_dir(&env_path.join("conda-meta")).await?;
691        fs::create_dir(&env_path.join("pack-meta")).await?;
692
693        // Create a basic conda-meta file to establish fingerprint
694        add_file(
695            env_path,
696            "conda-meta/history",
697            "==> 2023-01-01 00:00:00 <==\npackage install actions\n",
698            mtime,
699            false,
700        )
701        .await?;
702
703        // Create a basic package record
704        add_file(
705            env_path,
706            "conda-meta/package-1.0-0.json",
707            r#"{
708                "name": "package",
709                "version": "1.0",
710                "build": "0",
711                "build_number": 0,
712                "paths_data": {
713                    "paths": [
714                        {
715                            "path": "bin/test-file",
716                            "path_type": "hardlink",
717                            "size_in_bytes": 10,
718                            "mode": "text"
719                        }
720                    ]
721                },
722                "repodata_record": {
723                    "package_record": {
724                        "timestamp": 1672531200
725                    }
726                }
727            }"#,
728            mtime,
729            false,
730        )
731        .await?;
732
733        // Create offsets.jsonl
734        let offsets = Offsets {
735            entries: vec![OffsetRecord {
736                path: PathBuf::from("bin/test-file"),
737                mode: FileMode::Text,
738                offsets: vec![Offset {
739                    start: 0,
740                    len: 10,
741                    contents: None,
742                }],
743            }],
744        };
745        add_file(
746            env_path,
747            "pack-meta/offsets.jsonl",
748            &offsets.to_str()?,
749            mtime,
750            false,
751        )
752        .await?;
753
754        // Create the actual file referenced in the metadata
755        fs::create_dir(env_path.join("bin")).await?;
756        add_file(env_path, "bin/test-file", "test data\n", mtime, false).await?;
757
758        // Create a file that was prefix-updated after the package was installed.
759        let window = (
760            mtime + Duration::from_secs(20),
761            mtime + Duration::from_secs(25),
762        );
763        fs::create_dir(env_path.join("lib")).await?;
764        add_file(
765            env_path,
766            "lib/libfoo.so",
767            "libfoo.so contents\n",
768            window.0 + Duration::from_secs(5),
769            false,
770        )
771        .await?;
772
773        // Use provided prefix or default to "base"
774        let prefix_path = PathBuf::from(prefix.unwrap_or("base"));
775
776        // Create history.jsonl
777        let history = History {
778            entries: vec![
779                HistoryRecord {
780                    timestamp: mtime.duration_since(SystemTime::UNIX_EPOCH)?.as_secs(),
781                    prefix: PathBuf::from("/conda/prefix"),
782                    finished: true,
783                },
784                HistoryRecord {
785                    timestamp: window.0.duration_since(SystemTime::UNIX_EPOCH)?.as_secs(),
786                    prefix: prefix_path.clone(),
787                    finished: false,
788                },
789                HistoryRecord {
790                    timestamp: window.1.duration_since(SystemTime::UNIX_EPOCH)?.as_secs(),
791                    prefix: prefix_path,
792                    finished: true,
793                },
794            ],
795        };
796        add_file(
797            env_path,
798            "pack-meta/history.jsonl",
799            &history.to_str()?,
800            mtime,
801            false,
802        )
803        .await?;
804
805        Ok(dirpath)
806    }
807
808    /// Helper function to modify a file in the conda environment
809    async fn modify_file(
810        env_path: &Path,
811        file_path: &str,
812        content: &str,
813        mtime: SystemTime,
814    ) -> Result<()> {
815        let full_path = env_path.join(file_path);
816        fs::write(&full_path, content).await?;
817
818        // Set the file time
819        set_mtime(&full_path, mtime).await?;
820
821        Ok(())
822    }
823
824    /// Helper function to add a new file to the conda environment
825    async fn add_file(
826        env_path: &Path,
827        file_path: &str,
828        content: &str,
829        mtime: SystemTime,
830        executable: bool,
831    ) -> Result<()> {
832        let full_path = env_path.join(file_path);
833        fs::write(&full_path, content).await?;
834
835        if executable {
836            make_executable(&full_path).await?;
837        }
838
839        // Set the file time
840        set_mtime(&full_path, mtime).await?;
841
842        Ok(())
843    }
844
845    /// Helper function to verify file content
846    async fn verify_file_content(path1: &Path, path2: &Path) -> Result<bool> {
847        let content1 = fs::read_to_string(path1).await?;
848        let content2 = fs::read_to_string(path2).await?;
849        Ok(content1 == content2)
850    }
851
852    /// Helper function to verify file permissions
853    async fn verify_file_permissions(path: &Path, expected_executable: bool) -> Result<bool> {
854        let metadata = fs::metadata(path).await?;
855        let mode = metadata.permissions().mode();
856        let is_executable = mode & 0o111 != 0;
857        Ok(is_executable == expected_executable)
858    }
859
860    /// Helper function to verify symlink target
861    async fn verify_symlink_target(path: &Path, expected_target: &Path) -> Result<bool> {
862        let target = fs::read_link(path).await?;
863        Ok(target == expected_target)
864    }
865
866    #[tokio::test]
867    async fn test_sync_modified_file() -> Result<()> {
868        // Set base time for consistent file timestamps
869        let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200); // 2023-01-01 00:00:00 UTC
870
871        // Setup identical conda environments
872        let src_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
873        let dst_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
874
875        // Modify a file in the source environment
876        let modified_content = "modified test data\n";
877        let newer_time = base_time + Duration::from_hours(1); // 1 hour later
878        modify_file(
879            src_env.path(),
880            "bin/test-file",
881            modified_content,
882            newer_time,
883        )
884        .await?;
885
886        // Sync changes from source to destination
887        let actions = sync(src_env.path(), dst_env.path()).await?;
888
889        // Create expected actions map
890        let expected_actions = HashMap::from([(
891            PathBuf::from("bin/test-file"),
892            Action::Receive(newer_time, Receive::File { executable: false }),
893        )]);
894
895        // Verify the entire actions map
896        assert_eq!(actions, expected_actions);
897
898        // Verify the file was updated in the destination
899        assert!(
900            verify_file_content(
901                &src_env.path().join("bin/test-file"),
902                &dst_env.path().join("bin/test-file")
903            )
904            .await?
905        );
906
907        Ok(())
908    }
909
910    #[tokio::test]
911    async fn test_sync_new_file() -> Result<()> {
912        // Set base time for consistent file timestamps
913        let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200); // 2023-01-01 00:00:00 UTC
914
915        // Setup identical conda environments
916        let src_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
917        let dst_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
918
919        // Add a new file to the source environment
920        let new_file_content = "new file content\n";
921        let newer_time = base_time + Duration::from_hours(1); // 1 hour later
922        add_file(
923            src_env.path(),
924            "lib/new-file.txt",
925            new_file_content,
926            newer_time,
927            false,
928        )
929        .await?;
930
931        // Sync changes from source to destination
932        let actions = sync(src_env.path(), dst_env.path()).await?;
933
934        // Create expected actions map
935        let expected_actions = HashMap::from([
936            //(PathBuf::from("lib"), Action::Directory(newer_time)),
937            (
938                PathBuf::from("lib/new-file.txt"),
939                Action::Receive(newer_time, Receive::File { executable: false }),
940            ),
941        ]);
942
943        // Verify the entire actions map
944        assert_eq!(actions, expected_actions);
945
946        // Verify the new file was created in the destination
947        assert!(
948            verify_file_content(
949                &src_env.path().join("lib/new-file.txt"),
950                &dst_env.path().join("lib/new-file.txt")
951            )
952            .await?
953        );
954
955        Ok(())
956    }
957
958    #[tokio::test]
959    async fn test_sync_directory_creation() -> Result<()> {
960        // Set base time for consistent file timestamps
961        let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200); // 2023-01-01 00:00:00 UTC
962
963        // Setup identical conda environments
964        let src_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
965        let dst_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
966
967        // Create a new directory with a file in the source environment
968        let newer_time = base_time + Duration::from_hours(1); // 1 hour later
969        fs::create_dir(src_env.path().join("new_dir")).await?;
970        add_file(
971            src_env.path(),
972            "new_dir/test.txt",
973            "test content",
974            newer_time,
975            false,
976        )
977        .await?;
978        set_mtime(&src_env.path().join("new_dir"), newer_time).await?;
979
980        // Sync changes from source to destination
981        let actions = sync(src_env.path(), dst_env.path()).await?;
982
983        // Create expected actions map
984        let expected_actions = HashMap::from([
985            (PathBuf::from("new_dir"), Action::Directory),
986            (
987                PathBuf::from("new_dir/test.txt"),
988                Action::Receive(newer_time, Receive::File { executable: false }),
989            ),
990        ]);
991
992        // Verify the entire actions map
993        assert_eq!(actions, expected_actions);
994
995        // Verify the directory was created in the destination
996        assert!(dst_env.path().join("new_dir").exists());
997
998        // Verify the file was created in the destination
999        assert!(
1000            verify_file_content(
1001                &src_env.path().join("new_dir/test.txt"),
1002                &dst_env.path().join("new_dir/test.txt")
1003            )
1004            .await?
1005        );
1006
1007        Ok(())
1008    }
1009
1010    #[tokio::test]
1011    async fn test_sync_symlink() -> Result<()> {
1012        // Set base time for consistent file timestamps
1013        let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200); // 2023-01-01 00:00:00 UTC
1014
1015        // Setup identical conda environments
1016        let src_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
1017        let dst_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
1018
1019        // Create a symlink in the source environment
1020        fs::symlink("bin/test-file", src_env.path().join("link-to-test")).await?;
1021
1022        // Set a newer time for the symlink to ensure it's synced
1023        let newer_time = base_time + Duration::from_hours(1); // 1 hour later
1024        set_mtime(&src_env.path().join("link-to-test"), newer_time).await?;
1025
1026        // Sync changes from source to destination
1027        let actions = sync(src_env.path(), dst_env.path()).await?;
1028
1029        // Create expected actions map
1030        let expected_actions = HashMap::from([(
1031            PathBuf::from("link-to-test"),
1032            Action::Receive(newer_time, Receive::Symlink),
1033        )]);
1034
1035        // Verify the entire actions map
1036        assert_eq!(actions, expected_actions);
1037
1038        // Verify the symlink was created in the destination
1039        assert!(dst_env.path().join("link-to-test").exists());
1040
1041        // Verify the symlink target
1042        assert!(
1043            verify_symlink_target(
1044                &dst_env.path().join("link-to-test"),
1045                &PathBuf::from("bin/test-file")
1046            )
1047            .await?
1048        );
1049
1050        Ok(())
1051    }
1052
1053    #[tokio::test]
1054    async fn test_sync_file_deletion() -> Result<()> {
1055        // Set base time for consistent file timestamps
1056        let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200); // 2023-01-01 00:00:00 UTC
1057
1058        // Setup identical conda environments
1059        let src_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
1060        let dst_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
1061
1062        // Add an extra file to the destination that doesn't exist in source
1063        let newer_time = base_time + Duration::from_hours(1); // 1 hour later
1064        add_file(
1065            dst_env.path(),
1066            "extra-file.txt",
1067            "should be deleted",
1068            newer_time,
1069            false,
1070        )
1071        .await?;
1072
1073        // Sync changes from source to destination
1074        let actions = sync(src_env.path(), dst_env.path()).await?;
1075
1076        // Create expected actions map
1077        let expected_actions = HashMap::from([(
1078            PathBuf::from("extra-file.txt"),
1079            Action::Delete { directory: false },
1080        )]);
1081
1082        // Verify the entire actions map
1083        assert_eq!(actions, expected_actions);
1084
1085        // Verify the extra file was deleted from the destination
1086        assert!(!dst_env.path().join("extra-file.txt").exists());
1087
1088        Ok(())
1089    }
1090
1091    #[tokio::test]
1092    async fn test_sync_ignores_pyc_files() -> Result<()> {
1093        // Set base time for consistent file timestamps
1094        let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200); // 2023-01-01 00:00:00 UTC
1095
1096        // Setup identical conda environments
1097        let src_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
1098        let dst_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
1099
1100        // Add a .pyc file to the source.
1101        let newer_time = base_time + Duration::from_hours(1); // 1 hour later
1102        add_file(
1103            src_env.path(),
1104            "lib/test.pyc",
1105            "compiled python",
1106            newer_time,
1107            false,
1108        )
1109        .await?;
1110
1111        // Add a file in __pycache__ directory to the destination
1112        fs::create_dir(dst_env.path().join("lib/__pycache__")).await?;
1113        add_file(
1114            dst_env.path(),
1115            "lib/__pycache__/cached.pyc",
1116            "cached python",
1117            newer_time,
1118            false,
1119        )
1120        .await?;
1121
1122        // Sync changes from source to destination
1123        let actions = sync(src_env.path(), dst_env.path()).await?;
1124
1125        // For this test, we expect an empty actions map since .pyc files are ignored
1126        let expected_actions = HashMap::from([
1127            (
1128                PathBuf::from("lib/__pycache__"),
1129                Action::Delete { directory: true },
1130            ),
1131            (
1132                PathBuf::from("lib/__pycache__/cached.pyc"),
1133                Action::Delete { directory: false },
1134            ),
1135        ]);
1136
1137        // Verify the entire actions map
1138        assert_eq!(actions, expected_actions);
1139
1140        // Verify the .pyc files were deleted (they should be ignored)
1141        assert!(!dst_env.path().join("lib/test.pyc").exists());
1142        assert!(!dst_env.path().join("lib/__pycache__").exists());
1143        assert!(!dst_env.path().join("lib/__pycache__/cached.pyc").exists());
1144
1145        Ok(())
1146    }
1147
1148    #[tokio::test]
1149    async fn test_sync_executable_permissions() -> Result<()> {
1150        // Set base time for consistent file timestamps
1151        let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200); // 2023-01-01 00:00:00 UTC
1152
1153        // Setup identical conda environments
1154        let src_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
1155        let dst_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
1156
1157        // Add an executable file to the source
1158        let newer_time = base_time + Duration::from_hours(1); // 1 hour later
1159        add_file(
1160            src_env.path(),
1161            "bin/executable",
1162            "#!/bin/sh\necho hello",
1163            newer_time,
1164            true,
1165        )
1166        .await?;
1167
1168        // Sync changes from source to destination
1169        let actions = sync(src_env.path(), dst_env.path()).await?;
1170
1171        // Create expected actions map
1172        let expected_actions = HashMap::from([(
1173            PathBuf::from("bin/executable"),
1174            Action::Receive(newer_time, Receive::File { executable: true }),
1175        )]);
1176
1177        // Verify the entire actions map
1178        assert_eq!(actions, expected_actions);
1179
1180        // Verify the file was created in the destination
1181        assert!(dst_env.path().join("bin/executable").exists());
1182
1183        // Verify the file content was synced correctly
1184        assert!(
1185            verify_file_content(
1186                &src_env.path().join("bin/executable"),
1187                &dst_env.path().join("bin/executable")
1188            )
1189            .await?
1190        );
1191
1192        // Verify the executable permissions were preserved
1193        assert!(verify_file_permissions(&dst_env.path().join("bin/executable"), true).await?);
1194
1195        Ok(())
1196    }
1197
1198    #[tokio::test]
1199    async fn test_sync_text_file_prefix_replacement() -> Result<()> {
1200        // Set base time for consistent file timestamps
1201        let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200); // 2023-01-01 00:00:00 UTC
1202
1203        // Setup conda environments with different prefixes
1204        let src_prefix = "/opt/conda/src";
1205        let dst_prefix = "/opt/conda/dst";
1206        let src_env = setup_conda_env(TempDir::new()?, base_time, Some(src_prefix)).await?;
1207        let dst_env = setup_conda_env(TempDir::new()?, base_time, Some(dst_prefix)).await?;
1208
1209        // Add a text file with prefix references to the source
1210        let newer_time = base_time + Duration::from_hours(1);
1211        let text_content = format!(
1212            "#!/bin/bash\nexport PATH={}/bin:$PATH\necho 'Using prefix: {}'\n",
1213            src_prefix, src_prefix
1214        );
1215        add_file(
1216            src_env.path(),
1217            "bin/script.sh",
1218            &text_content,
1219            newer_time,
1220            true,
1221        )
1222        .await?;
1223
1224        // Add an empty file too.
1225        add_file(src_env.path(), "bin/script2.sh", "", newer_time, true).await?;
1226
1227        // Sync changes from source to destination
1228        let actions = sync(src_env.path(), dst_env.path()).await?;
1229
1230        // Verify the file was synced
1231        let expected_actions = HashMap::from([
1232            (
1233                PathBuf::from("bin/script.sh"),
1234                Action::Receive(newer_time, Receive::File { executable: true }),
1235            ),
1236            (
1237                PathBuf::from("bin/script2.sh"),
1238                Action::Receive(newer_time, Receive::File { executable: true }),
1239            ),
1240        ]);
1241        assert_eq!(actions, expected_actions);
1242
1243        // Verify the prefix was replaced in the destination file
1244        let dst_content = fs::read_to_string(dst_env.path().join("bin/script.sh")).await?;
1245        let expected_content = format!(
1246            "#!/bin/bash\nexport PATH={}/bin:$PATH\necho 'Using prefix: {}'\n",
1247            dst_prefix, dst_prefix
1248        );
1249        assert_eq!(dst_content, expected_content);
1250
1251        Ok(())
1252    }
1253
1254    #[tokio::test]
1255    async fn test_sync_binary_file_prefix_replacement() -> Result<()> {
1256        // Set base time for consistent file timestamps
1257        let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200);
1258
1259        // Setup conda environments with different prefixes
1260        let src_prefix = "/opt/conda/src";
1261        let dst_prefix = "/opt/conda/dst";
1262        let src_env = setup_conda_env(TempDir::new()?, base_time, Some(src_prefix)).await?;
1263        let dst_env = setup_conda_env(TempDir::new()?, base_time, Some(dst_prefix)).await?;
1264
1265        // Create a binary file with embedded prefix and null bytes
1266        let newer_time = base_time + Duration::from_hours(1);
1267        let mut binary_content = Vec::new();
1268        binary_content.extend_from_slice(b"\x7fELF"); // ELF magic number
1269        binary_content.extend_from_slice(&[0u8; 10]); // null bytes to make it binary
1270        binary_content.extend_from_slice(src_prefix.as_bytes());
1271        binary_content.extend_from_slice(&[0u8; 20]); // more null bytes
1272        binary_content.extend_from_slice(b"end");
1273
1274        fs::write(src_env.path().join("lib/binary"), &binary_content).await?;
1275        set_mtime(&src_env.path().join("lib/binary"), newer_time).await?;
1276
1277        // Sync changes from source to destination
1278        let actions = sync(src_env.path(), dst_env.path()).await?;
1279
1280        // Verify the file was synced
1281        let expected_actions = HashMap::from([(
1282            PathBuf::from("lib/binary"),
1283            Action::Receive(newer_time, Receive::File { executable: false }),
1284        )]);
1285        assert_eq!(actions, expected_actions);
1286
1287        // Verify the prefix was replaced in the binary file with null padding
1288        let dst_content = fs::read(dst_env.path().join("lib/binary")).await?;
1289
1290        // The original file size should be preserved
1291        assert_eq!(dst_content.len(), binary_content.len());
1292
1293        // Check that the prefix was replaced
1294        let dst_content_str = String::from_utf8_lossy(&dst_content);
1295        assert!(dst_content_str.contains(dst_prefix));
1296        assert!(!dst_content_str.contains(src_prefix));
1297
1298        // Verify the ELF header and end marker are still present
1299        assert!(dst_content.starts_with(b"\x7fELF"));
1300        assert!(dst_content.ends_with(b"end"));
1301
1302        Ok(())
1303    }
1304
1305    #[tokio::test]
1306    async fn test_sync_symlink_prefix_replacement() -> Result<()> {
1307        // Set base time for consistent file timestamps
1308        let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200);
1309
1310        // Setup conda environments with different prefixes
1311        let src_prefix = "/opt/conda/src";
1312        let dst_prefix = "/opt/conda/dst";
1313        let src_env = setup_conda_env(TempDir::new()?, base_time, Some(src_prefix)).await?;
1314        let dst_env = setup_conda_env(TempDir::new()?, base_time, Some(dst_prefix)).await?;
1315
1316        // Create a symlink that points to a path with the source prefix
1317        let newer_time = base_time + Duration::from_hours(1);
1318        let symlink_target = format!("{}/lib/target-file", src_prefix);
1319        fs::symlink(&symlink_target, src_env.path().join("bin/link-to-target")).await?;
1320        set_mtime(&src_env.path().join("bin/link-to-target"), newer_time).await?;
1321
1322        // Sync changes from source to destination
1323        let actions = sync(src_env.path(), dst_env.path()).await?;
1324
1325        // Verify the symlink was synced
1326        let expected_actions = HashMap::from([(
1327            PathBuf::from("bin/link-to-target"),
1328            Action::Receive(newer_time, Receive::Symlink),
1329        )]);
1330        assert_eq!(actions, expected_actions);
1331
1332        // Verify the symlink target was updated with the destination prefix
1333        let dst_target = fs::read_link(dst_env.path().join("bin/link-to-target")).await?;
1334        let expected_target = PathBuf::from(format!("{}/lib/target-file", dst_prefix));
1335        assert_eq!(dst_target, expected_target);
1336
1337        Ok(())
1338    }
1339
1340    #[tokio::test]
1341    async fn test_sync_symlink_no_prefix_replacement() -> Result<()> {
1342        // Set base time for consistent file timestamps
1343        let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200);
1344
1345        // Setup conda environments with different prefixes
1346        let src_prefix = "/opt/conda/src";
1347        let dst_prefix = "/opt/conda/dst";
1348        let src_env = setup_conda_env(TempDir::new()?, base_time, Some(src_prefix)).await?;
1349        let dst_env = setup_conda_env(TempDir::new()?, base_time, Some(dst_prefix)).await?;
1350
1351        // Create a symlink that points to a relative path (should not be modified)
1352        let newer_time = base_time + Duration::from_hours(1);
1353        let symlink_target = "relative/path/target";
1354        fs::symlink(&symlink_target, src_env.path().join("bin/relative-link")).await?;
1355        set_mtime(&src_env.path().join("bin/relative-link"), newer_time).await?;
1356
1357        // Sync changes from source to destination
1358        let actions = sync(src_env.path(), dst_env.path()).await?;
1359
1360        // Verify the symlink was synced
1361        let expected_actions = HashMap::from([(
1362            PathBuf::from("bin/relative-link"),
1363            Action::Receive(newer_time, Receive::Symlink),
1364        )]);
1365        assert_eq!(actions, expected_actions);
1366
1367        // Verify the symlink target was NOT modified (since it doesn't start with src_prefix)
1368        let dst_target = fs::read_link(dst_env.path().join("bin/relative-link")).await?;
1369        let expected_target = PathBuf::from(symlink_target);
1370        assert_eq!(dst_target, expected_target);
1371
1372        Ok(())
1373    }
1374
1375    #[tokio::test]
1376    async fn test_sync_binary_file_prefix_replacement_fails_when_dst_longer() -> Result<()> {
1377        // Set base time for consistent file timestamps
1378        let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200);
1379
1380        // Setup conda environments where destination prefix is longer than source
1381        let src_prefix = "/opt/src"; // Short prefix
1382        let dst_prefix = "/opt/very/long/destination/prefix"; // Much longer prefix
1383        let src_env = setup_conda_env(TempDir::new()?, base_time, Some(src_prefix)).await?;
1384        let dst_env = setup_conda_env(TempDir::new()?, base_time, Some(dst_prefix)).await?;
1385
1386        // Create a binary file with embedded prefix and null bytes
1387        let newer_time = base_time + Duration::from_hours(1);
1388        let mut binary_content = Vec::new();
1389        binary_content.extend_from_slice(b"\x7fELF"); // ELF magic number
1390        binary_content.extend_from_slice(&[0u8; 10]); // null bytes to make it binary
1391        binary_content.extend_from_slice(src_prefix.as_bytes());
1392        binary_content.extend_from_slice(&[0u8; 20]); // more null bytes
1393        binary_content.extend_from_slice(b"end");
1394
1395        fs::write(src_env.path().join("lib/binary"), &binary_content).await?;
1396        set_mtime(&src_env.path().join("lib/binary"), newer_time).await?;
1397
1398        // Sync changes from source to destination - this should fail
1399        let result = sync(src_env.path(), dst_env.path()).await;
1400
1401        // Verify that the sync operation failed due to the destination prefix being longer
1402        assert!(result.is_err());
1403        let error_msg = result.unwrap_err().to_string();
1404        assert!(error_msg.contains("Input is longer than target length"));
1405
1406        Ok(())
1407    }
1408}