Check files in parallel with crossbeam (#6783)

* chore: add `crossbeam-channel` as a dependency for the mpmc channel

* refactor: package up all the git repository details in `Repository`

* refactor: encapsulate logic for cloning repositories

* refactor: add `check_diff_for_file` to support parallel file checks

Processing all files in parallel is more efficient than checking each
repo in parallel.

One issue with the current design of processing each repo in parallel is
that threads that process smaller repos end early and don't help process
files from larger repos. For example, r-l/rust is a large repo that
takes a long time to check because there's only one thread working on
it.

Besides enabling us to process all files in parallel,
`check_diff_for_file` return a `Result<(), (Diff, ..)>` instead of a
`u8`, which is a more idiomatic way to represent any errors we find when
checking for diffs.

* refactor: process files in parallel and report errors at the end

Now we'll process each file in parallel instead of processing each repo
in parallel, and after checking all repositories we'll report on any
errors that we've found.

* feat: Add `worker_threads` option

This option control how many threads process files in parallel.
Setting the default to 16 as that's a common multiple of CPU cores.
diff --git a/check_diff/Cargo.lock b/check_diff/Cargo.lock
index 95bdd32..e8314b4 100644
--- a/check_diff/Cargo.lock
+++ b/check_diff/Cargo.lock
@@ -1,6 +1,6 @@
 # This file is automatically @generated by Cargo.
 # It is not intended for manual editing.
-version = 3
+version = 4
 
 [[package]]
 name = "aho-corasick"
@@ -77,6 +77,7 @@
 version = "0.1.0"
 dependencies = [
  "clap",
+ "crossbeam-channel",
  "diffy",
  "tempfile",
  "tracing",
@@ -131,6 +132,21 @@
 checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422"
 
 [[package]]
+name = "crossbeam-channel"
+version = "0.5.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2"
+dependencies = [
+ "crossbeam-utils",
+]
+
+[[package]]
+name = "crossbeam-utils"
+version = "0.8.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28"
+
+[[package]]
 name = "diffy"
 version = "0.4.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/check_diff/Cargo.toml b/check_diff/Cargo.toml
index 877735e..5a4f608 100644
--- a/check_diff/Cargo.toml
+++ b/check_diff/Cargo.toml
@@ -12,3 +12,4 @@
 tempfile = "3"
 walkdir = "2.5.0"
 diffy = "0.4.0"
+crossbeam-channel = "0.5.15"
diff --git a/check_diff/src/lib.rs b/check_diff/src/lib.rs
index eb6c1d4..2ef16d4 100644
--- a/check_diff/src/lib.rs
+++ b/check_diff/src/lib.rs
@@ -1,11 +1,14 @@
 use std::borrow::Cow;
+use std::collections::HashMap;
 use std::env;
 use std::fmt::{Debug, Display};
 use std::io::{self, Write};
 use std::path::{Path, PathBuf};
 use std::process::{Command, Stdio};
 use std::str::FromStr;
-use tracing::{debug, error, info, trace};
+use std::sync::{Arc, Mutex};
+use tempfile::tempdir;
+use tracing::{debug, info, trace, warn};
 use walkdir::WalkDir;
 
 #[derive(Debug, Clone, Copy)]
@@ -411,6 +414,46 @@
 
     Cow::Owned(result)
 }
+
+pub struct Repository<P> {
+    /// Name of the repository
+    name: String,
+    /// Path to the repository on the local file system
+    dir_path: P,
+}
+
+impl<P> Repository<P> {
+    /// Initialize a new Repository
+    pub fn new(git_url: &str, dir_path: P) -> Self {
+        let name = get_repo_name(git_url).to_string();
+        Self { name, dir_path }
+    }
+
+    /// Get the `name` of the repository
+    pub fn name(&self) -> &str {
+        &self.name
+    }
+
+    /// Get the absolute path to where this repository was cloned
+    pub fn path(&self) -> &Path
+    where
+        P: AsRef<Path>,
+    {
+        self.dir_path.as_ref()
+    }
+
+    /// Get the relative path of a file contained in this repository
+    pub fn relative_path<'f, F>(&self, file: &'f F) -> &'f Path
+    where
+        P: AsRef<Path>,
+        F: AsRef<Path>,
+    {
+        file.as_ref()
+            .strip_prefix(self.dir_path.as_ref())
+            .unwrap_or(file.as_ref())
+    }
+}
+
 /// Clone a git repository
 ///
 /// Parameters:
@@ -641,76 +684,111 @@
     })
 }
 
+/// Encapsulate the logic used to clone repositories for the diff check
+pub fn clone_repositories_for_diff_check(
+    repositories: &[&str],
+) -> Vec<Repository<tempfile::TempDir>> {
+    // Use a Hashmap to deduplicate any repositories
+    let map = Arc::new(Mutex::new(HashMap::new()));
+
+    std::thread::scope(|s| {
+        for url in repositories {
+            let map = Arc::clone(&map);
+
+            s.spawn(move || {
+                let repo_name = get_repo_name(url);
+                info!("Processing repo: {repo_name}");
+                let Ok(tmp_dir) = tempdir() else {
+                    warn!(
+                        "Failed to create a tempdir for {}. Can't check formatting diff for {}",
+                        &url, repo_name
+                    );
+                    return;
+                };
+
+                let Ok(_) = clone_git_repo(url, tmp_dir.path()) else {
+                    warn!(
+                        "Failed to clone repo {}. Can't check formatting diff for {}",
+                        &url, repo_name
+                    );
+                    return;
+                };
+
+                let repo = Repository::new(url, tmp_dir);
+                map.lock().unwrap().insert(repo_name.to_string(), repo);
+            });
+        }
+    });
+
+    let map = match Arc::into_inner(map)
+        .expect("All other threads are done")
+        .into_inner()
+    {
+        Ok(map) => map,
+        Err(e) => e.into_inner(),
+    };
+
+    map.into_values().collect()
+}
+
 /// Calculates the number of errors when running the compiled binary and the feature binary on the
 /// repo specified with the specific configs.
-pub fn check_diff<P: AsRef<Path>>(
+pub fn check_diff_for_file<'repo, P: AsRef<Path>, F: AsRef<Path>>(
     runners: &CheckDiffRunners<impl CodeFormatter, impl CodeFormatter>,
-    repo: P,
-    repo_url: &str,
-) -> u8 {
-    let mut errors: u8 = 0;
-    let repo = repo.as_ref();
-    let iter = search_for_rs_files(repo);
-    for file in iter {
-        let relative_path = file.strip_prefix(repo).unwrap_or(&file);
-        let repo_name = get_repo_name(repo_url);
+    repo: &'repo Repository<P>,
+    file: F,
+) -> Result<(), (Diff, F, &'repo Repository<P>)> {
+    let relative_path = repo.relative_path(&file);
+    let repo_name = repo.name();
 
-        trace!(
-            "Formatting '{0}' file {0}/{1}",
-            repo_name,
-            relative_path.display()
-        );
+    trace!(
+        "Formatting '{0}' file {0}/{1}",
+        repo_name,
+        relative_path.display()
+    );
 
-        match runners.create_diff(file.as_path()) {
-            Ok(diff) => {
-                if !diff.is_empty() {
-                    error!(
-                        "Diff found in '{0}' when formatting {0}/{1}\n{2}",
-                        repo_name,
-                        relative_path.display(),
-                        diff,
-                    );
-                    errors = errors.saturating_add(1);
-                } else {
-                    trace!(
-                        "No diff found in '{0}' when formatting {0}/{1}",
-                        repo_name,
-                        relative_path.display(),
-                    )
-                }
-            }
-            Err(CreateDiffError::MainRustfmtFailed(e)) => {
-                debug!(
-                    "`main` rustfmt failed to format {}/{}\n{:?}",
+    match runners.create_diff(file.as_ref()) {
+        Ok(diff) => {
+            if !diff.is_empty() {
+                Err((diff, file, repo))
+            } else {
+                trace!(
+                    "No diff found in '{0}' when formatting {0}/{1}",
                     repo_name,
                     relative_path.display(),
-                    e,
                 );
-                continue;
-            }
-            Err(CreateDiffError::FeatureRustfmtFailed(e)) => {
-                debug!(
-                    "`feature` rustfmt failed to format {}/{}\n{:?}",
-                    repo_name,
-                    relative_path.display(),
-                    e,
-                );
-                continue;
-            }
-            Err(CreateDiffError::BothRustfmtFailed { src, feature }) => {
-                debug!(
-                    "Both rustfmt binaries failed to format {}/{}\n{:?}\n{:?}",
-                    repo_name,
-                    relative_path.display(),
-                    src,
-                    feature,
-                );
-                continue;
+                Ok(())
             }
         }
+        Err(CreateDiffError::MainRustfmtFailed(e)) => {
+            debug!(
+                "`main` rustfmt failed to format {}/{}\n{:?}",
+                repo_name,
+                relative_path.display(),
+                e,
+            );
+            Ok(())
+        }
+        Err(CreateDiffError::FeatureRustfmtFailed(e)) => {
+            debug!(
+                "`feature` rustfmt failed to format {}/{}\n{:?}",
+                repo_name,
+                relative_path.display(),
+                e,
+            );
+            Ok(())
+        }
+        Err(CreateDiffError::BothRustfmtFailed { src, feature }) => {
+            debug!(
+                "Both rustfmt binaries failed to format {}/{}\n{:?}\n{:?}",
+                repo_name,
+                relative_path.display(),
+                src,
+                feature,
+            );
+            Ok(())
+        }
     }
-
-    errors
 }
 
 /// parse out the repository name from a GitHub Repository name.
@@ -721,3 +799,60 @@
         .unwrap_or(("", strip_git_prefix));
     repo_name
 }
+
+pub fn check_diff<'repo, P, F, M>(
+    runners: &CheckDiffRunners<F, M>,
+    repositories: &'repo [Repository<P>],
+    worker_threads: std::num::NonZeroU8,
+) -> Vec<(Diff, PathBuf, &'repo Repository<P>)>
+where
+    P: AsRef<Path> + Sync + Send,
+    F: CodeFormatter + Sync,
+    M: CodeFormatter + Sync,
+{
+    let (tx, rx) = crossbeam_channel::unbounded();
+
+    let errors = std::thread::scope(|s| {
+        // Spawn producer threads that find files to check
+        for repo in repositories.iter() {
+            let tx = tx.clone();
+            s.spawn(move || {
+                for file in search_for_rs_files(repo.path()) {
+                    let _ = tx.send((file, repo));
+                }
+            });
+        }
+
+        // Drop the first `tx` we created. Now there's exactly one `tx` per producer thread so when
+        // each producer thread finishes the receiving threads will start to get Err(RecvError)
+        // when calling `rx.recv()` and they'll know to stop processing files.
+        // When all scoped threads end we'll know we're done with processing and we can return
+        // any errors we found to the caller.
+        drop(tx);
+
+        let errors = Arc::new(Mutex::new(Vec::with_capacity(10)));
+
+        // spawn receiver threads used to process all files:
+        for _ in 0..u8::from(worker_threads) {
+            let errors = Arc::clone(&errors);
+            let rx = rx.clone();
+            s.spawn(move || {
+                while let Ok((file, repo)) = rx.recv() {
+                    if let Err(e) = check_diff_for_file(runners, repo, file) {
+                        // Push errors to report on later
+                        errors.lock().unwrap().push(e);
+                    }
+                }
+            });
+        }
+        errors
+    });
+
+    match Arc::into_inner(errors)
+        .expect("All other threads are done")
+        .into_inner()
+    {
+        Ok(e) => e,
+        Err(e) => e.into_inner(),
+    }
+}
diff --git a/check_diff/src/main.rs b/check_diff/src/main.rs
index b36ab18..cc776dd 100644
--- a/check_diff/src/main.rs
+++ b/check_diff/src/main.rs
@@ -1,15 +1,12 @@
 use std::io::Error;
 use std::process::ExitCode;
-use std::sync::Arc;
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::thread;
 
 use check_diff::{
-    Edition, StyleEdition, check_diff, clone_git_repo, compile_rustfmt, get_repo_name,
+    Edition, StyleEdition, check_diff, clone_repositories_for_diff_check, compile_rustfmt,
 };
 use clap::Parser;
 use tempfile::tempdir;
-use tracing::{error, info, warn};
+use tracing::{error, info};
 
 /// A curated set of `rust-lang/*` and popular ecosystem repositories to compare `rustfmt`s against.
 const REPOS: &[&str] = &[
@@ -60,6 +57,10 @@
     /// pass when running the feature branch
     #[arg(value_delimiter = ',', short, long, num_args = 1..)]
     rustfmt_config: Option<Vec<String>>,
+    /// How many threads should check for formatting diffs.
+    // Choosing 16 as the default since that's a common multiple of available CPU cores.
+    #[arg(short, long, default_value_t = std::num::NonZeroU8::new(16).unwrap())]
+    worker_threads: std::num::NonZeroU8,
 }
 
 fn main() -> Result<ExitCode, Error> {
@@ -88,47 +89,29 @@
         }
     };
 
-    let errors = Arc::new(AtomicUsize::new(0));
-    let check_diff_runners = Arc::new(check_diff_runners);
+    // Clone all repositories we plan to check
+    let repositories = clone_repositories_for_diff_check(REPOS);
 
-    thread::scope(|s| {
-        for url in REPOS {
-            let errors = Arc::clone(&errors);
-            let check_diff_runners = Arc::clone(&check_diff_runners);
-            s.spawn(move || {
-                let repo_name = get_repo_name(url);
-                info!("Processing repo: {repo_name}");
-                let Ok(tmp_dir) = tempdir() else {
-                    warn!(
-                        "Failed to create a tempdir for {}. Can't check formatting diff for {}",
-                        &url, repo_name
-                    );
-                    return;
-                };
+    info!("Starting the Diff Check");
+    let errors = check_diff(&check_diff_runners, &repositories, args.worker_threads);
 
-                let Ok(_) = clone_git_repo(url, tmp_dir.path()) else {
-                    warn!(
-                        "Failed to clone repo {}. Can't check formatting diff for {}",
-                        &url, repo_name
-                    );
-                    return;
-                };
-
-                let error_count = check_diff(&check_diff_runners, tmp_dir.path(), url);
-
-                errors.fetch_add(error_count as usize, Ordering::Relaxed);
-            });
-        }
-    });
-
-    let error_count = Arc::into_inner(errors)
-        .expect("All other threads are done")
-        .load(Ordering::Relaxed);
-    if error_count > 0 {
-        error!("{error_count} formatting diffs found 💔");
-        Ok(ExitCode::FAILURE)
-    } else {
+    if errors.is_empty() {
         info!("No diff found 😊");
-        Ok(ExitCode::SUCCESS)
+        return Ok(ExitCode::SUCCESS);
     }
+
+    for (diff, file, repo) in errors.iter() {
+        let repo_name = repo.name();
+        let relative_path = repo.relative_path(&file);
+
+        error!(
+            "Diff found in '{0}' when formatting {0}/{1}\n{2}",
+            repo_name,
+            relative_path.display(),
+            diff,
+        );
+    }
+
+    error!("{} formatting diffs found 💔", errors.len());
+    Ok(ExitCode::FAILURE)
 }
diff --git a/check_diff/tests/check_diff.rs b/check_diff/tests/check_diff.rs
index 6ec02b1..e5a934a 100644
--- a/check_diff/tests/check_diff.rs
+++ b/check_diff/tests/check_diff.rs
@@ -1,5 +1,5 @@
 use check_diff::{
-    CheckDiffError, CheckDiffRunners, CodeFormatter, FormatCodeError, check_diff,
+    CheckDiffError, CheckDiffRunners, CodeFormatter, FormatCodeError, Repository, check_diff,
     search_for_rs_files,
 };
 use std::fs::File;
@@ -70,10 +70,12 @@
     let dir = Builder::new().tempdir_in("").unwrap();
     let file_path = dir.path().join("test.rs");
     let _tmp_file = File::create(file_path)?;
-    let repo_url = "https://github.com/rust-lang/rustfmt.git";
+    let repo = Repository::new("https://github.com/rust-lang/rustfmt.git", dir);
+    let repos = [repo];
+    let workers = std::num::NonZeroU8::new(1).unwrap();
 
-    let errors = check_diff(&runners, dir.path(), repo_url);
-    assert_eq!(errors, 0);
+    let errors = check_diff(&runners, &repos, workers);
+    assert_eq!(errors.len(), 0);
     Ok(())
 }
 
@@ -83,9 +85,11 @@
     let dir = Builder::new().tempdir_in("").unwrap();
     let file_path = dir.path().join("test.rs");
     let _tmp_file = File::create(file_path)?;
-    let repo_url = "https://github.com/rust-lang/rustfmt.git";
+    let repo = Repository::new("https://github.com/rust-lang/rustfmt.git", dir);
+    let repos = [repo];
+    let workers = std::num::NonZeroU8::new(1).unwrap();
 
-    let errors = check_diff(&runners, dir.path(), repo_url);
-    assert_ne!(errors, 0);
+    let errors = check_diff(&runners, &repos, workers);
+    assert_ne!(errors.len(), 0);
     Ok(())
 }