From 6011d03cb9c9f229489337eb810b4b425a4a44e1 Mon Sep 17 00:00:00 2001 From: Luni-4 Date: Thu, 25 Nov 2021 16:07:50 +0100 Subject: [PATCH 1/2] Add APIs to process the files in a directory concurrently They can be used in different contexts, for example, to count the number of files in a directory, the number of its subdirectories or extract some information contained or scattered through different files. They can also be used to compute some metrics that act on files in some way. --- Cargo.lock | 3 + Cargo.toml | 5 +- src/concurrent_files.rs | 279 ++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 3 + 4 files changed, 289 insertions(+), 1 deletion(-) create mode 100644 src/concurrent_files.rs diff --git a/Cargo.lock b/Cargo.lock index 9afcc4ea1..ea73f4d32 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1554,7 +1554,9 @@ name = "rust-code-analysis" version = "0.0.24" dependencies = [ "aho-corasick", + "crossbeam", "fxhash", + "globset", "lazy_static", "num", "num-derive", @@ -1575,6 +1577,7 @@ dependencies = [ "tree-sitter-python", "tree-sitter-rust", "tree-sitter-typescript", + "walkdir", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 52e218c85..bdd934df5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,16 +12,19 @@ license = "MPL-2.0" [dependencies] aho-corasick = "^0.7" +crossbeam = { version = "^0.8", features = ["crossbeam-channel"] } fxhash = "0.2" +globset = "^0.4" lazy_static = "^1.3" -num-format = "^0.4" num = "^0.4" num-derive = "^0.3" +num-format = "^0.4" num-traits = "^0.2" petgraph = "^0.6" regex = "^1.5" serde = { version = "^1.0", features = ["derive"] } termcolor = "^1.1" +walkdir = "^2.3" tree-sitter = "=0.19.3" tree-sitter-java = "=0.19.0" diff --git a/src/concurrent_files.rs b/src/concurrent_files.rs new file mode 100644 index 000000000..eff3b137b --- /dev/null +++ b/src/concurrent_files.rs @@ -0,0 +1,279 @@ +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::thread; + +use crossbeam::channel::{unbounded, Receiver, Sender}; +use globset::GlobSet; +use walkdir::{DirEntry, WalkDir}; + +type ProcFilesFunction = dyn Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync; + +type ProcDirPathsFunction = + dyn Fn(&mut HashMap>, &Path, &Config) + Send + Sync; + +type ProcPathFunction = dyn Fn(&Path, &Config) + Send + Sync; + +// Null functions removed at compile time +fn null_proc_dir_paths(_: &mut HashMap>, _: &Path, _: &Config) {} +fn null_proc_path(_: &Path, _: &Config) {} + +struct JobItem { + path: PathBuf, + cfg: Arc, +} + +type JobReceiver = Receiver>>; +type JobSender = Sender>>; + +fn consumer(receiver: JobReceiver, func: Arc) +where + ProcFiles: Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync, +{ + while let Ok(job) = receiver.recv() { + if job.is_none() { + break; + } + // Cannot panic because of the check immediately above. + let job = job.unwrap(); + let path = job.path.clone(); + + if let Err(err) = func(job.path, &job.cfg) { + eprintln!("{:?} for file {:?}", err, path); + } + } +} + +fn send_file( + path: PathBuf, + cfg: &Arc, + sender: &JobSender, +) -> Result<(), ConcurrentErrors> { + sender + .send(Some(JobItem { + path, + cfg: Arc::clone(cfg), + })) + .map_err(|e| ConcurrentErrors::Sender(e.to_string())) +} + +fn is_hidden(entry: &DirEntry) -> bool { + entry + .file_name() + .to_str() + .map(|s| s.starts_with('.')) + .unwrap_or(false) +} + +fn explore( + files_data: FilesData, + cfg: &Arc, + proc_dir_paths: ProcDirPaths, + proc_path: ProcPath, + sender: &JobSender, +) -> Result>, ConcurrentErrors> +where + ProcDirPaths: Fn(&mut HashMap>, &Path, &Config) + Send + Sync, + ProcPath: Fn(&Path, &Config) + Send + Sync, +{ + let FilesData { + mut paths, + ref include, + ref exclude, + } = files_data; + + let mut all_files: HashMap> = HashMap::new(); + + for path in paths.drain(..) { + let path = PathBuf::from(path); + if !path.exists() { + eprintln!("Warning: File doesn't exist: {:?}", path); + continue; + } + if path.is_dir() { + for entry in WalkDir::new(path) + .into_iter() + .filter_entry(|e| !is_hidden(e)) + { + let entry = match entry { + Ok(entry) => entry, + Err(e) => return Err(ConcurrentErrors::Sender(e.to_string())), + }; + let path = entry.path().to_path_buf(); + if (include.is_empty() || include.is_match(&path)) + && (exclude.is_empty() || !exclude.is_match(&path)) + && path.is_file() + { + proc_dir_paths(&mut all_files, &path, cfg); + send_file(path, cfg, sender)?; + } + } + } else if (include.is_empty() || include.is_match(&path)) + && (exclude.is_empty() || !exclude.is_match(&path)) + && path.is_file() + { + proc_path(&path, cfg); + send_file(path, cfg, sender)?; + } + } + + Ok(all_files) +} + +/// Series of errors that might happen when processing files concurrently. +#[derive(Debug)] +pub enum ConcurrentErrors { + /// Producer side error. + /// + /// An error occurred inside the producer thread. + Producer(String), + /// Sender side error. + /// + /// An error occurred when sending an item. + Sender(String), + /// Receiver side error. + /// + /// An error occurred inside one of the receiver threads. + Receiver(String), + /// Thread side error. + /// + /// A general error occurred when a thread is being spawned or run. + Thread(String), +} + +/// Data related to files. +pub struct FilesData { + /// Kind of files included in a search. + pub include: GlobSet, + /// Kind of files excluded from a search. + pub exclude: GlobSet, + /// List of file paths. + pub paths: Vec, +} + +/// A runner to process files concurrently. +pub struct ConcurrentRunner { + proc_files: Box>, + proc_dir_paths: Box>, + proc_path: Box>, + num_jobs: usize, +} + +impl ConcurrentRunner { + /// Creates a new `ConcurrentRunner`. + /// + /// * `num_jobs` - Number of jobs utilized to process files concurrently. + /// * `proc_files` - Function that processes each file found during + /// the search. + pub fn new(num_jobs: usize, proc_files: ProcFiles) -> Self + where + ProcFiles: 'static + Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync, + { + let num_jobs = std::cmp::max(2, num_jobs) - 1; + Self { + proc_files: Box::new(proc_files), + proc_dir_paths: Box::new(null_proc_dir_paths), + proc_path: Box::new(null_proc_path), + num_jobs, + } + } + + /// Sets the function to process the paths and subpaths contained in a + /// directory. + pub fn set_proc_dir_paths(mut self, proc_dir_paths: ProcDirPaths) -> Self + where + ProcDirPaths: + 'static + Fn(&mut HashMap>, &Path, &Config) + Send + Sync, + { + self.proc_dir_paths = Box::new(proc_dir_paths); + self + } + + /// Sets the function to process a single path. + pub fn set_proc_path(mut self, proc_path: ProcPath) -> Self + where + ProcPath: 'static + Fn(&Path, &Config) + Send + Sync, + { + self.proc_path = Box::new(proc_path); + self + } + + /// Runs the producer-consumer approach to process the files + /// contained in a directory and in its own subdirectories. + /// + /// * `config` - Information used to process a file. + /// * `files_data` - Information about the files to be included or excluded + /// from a search more the number of paths considered in the search. + pub fn run( + self, + config: Config, + files_data: FilesData, + ) -> Result>, ConcurrentErrors> { + let cfg = Arc::new(config); + + let (sender, receiver) = unbounded(); + + let producer = { + let sender = sender.clone(); + + match thread::Builder::new() + .name(String::from("Producer")) + .spawn(move || { + explore( + files_data, + &cfg, + self.proc_dir_paths, + self.proc_path, + &sender, + ) + }) { + Ok(producer) => producer, + Err(e) => return Err(ConcurrentErrors::Thread(e.to_string())), + } + }; + + let mut receivers = Vec::with_capacity(self.num_jobs); + let proc_files = Arc::new(self.proc_files); + for i in 0..self.num_jobs { + let receiver = receiver.clone(); + let proc_files = proc_files.clone(); + + let t = match thread::Builder::new() + .name(format!("Consumer {}", i)) + .spawn(move || { + consumer(receiver, proc_files); + }) { + Ok(receiver) => receiver, + Err(e) => return Err(ConcurrentErrors::Thread(e.to_string())), + }; + + receivers.push(t); + } + + let all_files = match producer.join() { + Ok(res) => res, + Err(_) => { + return Err(ConcurrentErrors::Producer( + "Child thread panicked".to_owned(), + )) + } + }; + + // Poison the receiver, now that the producer is finished. + for _ in 0..self.num_jobs { + if let Err(e) = sender.send(None) { + return Err(ConcurrentErrors::Sender(e.to_string())); + } + } + + for receiver in receivers { + if receiver.join().is_err() { + return Err(ConcurrentErrors::Receiver( + "A thread used to process a file panicked".to_owned(), + )); + } + } + + all_files + } +} diff --git a/src/lib.rs b/src/lib.rs index 179ecd0e7..5ff13c716 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -107,6 +107,9 @@ pub use crate::langs::*; mod tools; pub use crate::tools::*; +mod concurrent_files; +pub use crate::concurrent_files::*; + mod traits; pub use crate::traits::*; From 890ebf7d181173798cc80ad843ed7d244d492ce2 Mon Sep 17 00:00:00 2001 From: Luni-4 Date: Fri, 26 Nov 2021 15:30:05 +0100 Subject: [PATCH 2/2] cli: Update cli --- Cargo.lock | 2 - rust-code-analysis-cli/Cargo.toml | 2 - rust-code-analysis-cli/src/main.rs | 175 ++++++----------------------- 3 files changed, 35 insertions(+), 144 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ea73f4d32..9229f2a7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1585,7 +1585,6 @@ name = "rust-code-analysis-cli" version = "0.0.24" dependencies = [ "clap", - "crossbeam", "globset", "num_cpus", "regex", @@ -1595,7 +1594,6 @@ dependencies = [ "serde_json", "serde_yaml", "toml", - "walkdir", ] [[package]] diff --git a/rust-code-analysis-cli/Cargo.toml b/rust-code-analysis-cli/Cargo.toml index 64785a683..3d5ec5f1c 100644 --- a/rust-code-analysis-cli/Cargo.toml +++ b/rust-code-analysis-cli/Cargo.toml @@ -13,7 +13,6 @@ name = "rust-code-analysis-cli" [dependencies] clap = "^2.34" -crossbeam = "^0.8" globset = "^0.4" num_cpus = "^1.13" regex = "^1.5" @@ -23,4 +22,3 @@ serde_cbor = "^0.11" serde_json = "^1.0" serde_yaml = "^0.8" toml = "^0.5" -walkdir = "^2.3" diff --git a/rust-code-analysis-cli/src/main.rs b/rust-code-analysis-cli/src/main.rs index 3b1092b52..94247fd55 100644 --- a/rust-code-analysis-cli/src/main.rs +++ b/rust-code-analysis-cli/src/main.rs @@ -1,6 +1,5 @@ #[macro_use] extern crate clap; -extern crate crossbeam; extern crate num_cpus; extern crate serde; extern crate serde_cbor; @@ -11,15 +10,13 @@ extern crate toml; mod formats; use clap::{App, Arg}; -use crossbeam::channel::{unbounded, Receiver, Sender}; use globset::{Glob, GlobSet, GlobSetBuilder}; use std::collections::{hash_map, HashMap}; use std::fmt; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; +use std::process; use std::str::FromStr; use std::sync::{Arc, Mutex}; -use std::{process, thread}; -use walkdir::{DirEntry, WalkDir}; use formats::Format; @@ -28,8 +25,9 @@ use rust_code_analysis::LANG; // Structs use rust_code_analysis::{ - CommentRm, CommentRmCfg, Count, CountCfg, Dump, DumpCfg, Find, FindCfg, Function, FunctionCfg, - Metrics, MetricsCfg, OpsCfg, OpsCode, PreprocParser, PreprocResults, + CommentRm, CommentRmCfg, ConcurrentRunner, Count, CountCfg, Dump, DumpCfg, FilesData, Find, + FindCfg, Function, FunctionCfg, Metrics, MetricsCfg, OpsCfg, OpsCode, PreprocParser, + PreprocResults, }; // Functions @@ -48,6 +46,7 @@ struct Config { comments: bool, find_filter: Vec, count_filter: Vec, + language: Option, function: bool, metrics: bool, ops: bool, @@ -61,15 +60,6 @@ struct Config { count_lock: Option>>, } -struct JobItem { - language: Option, - path: PathBuf, - cfg: Arc, -} - -type JobReceiver = Receiver>; -type JobSender = Sender>; - fn mk_globset(elems: clap::Values) -> GlobSet { let mut globset = GlobSetBuilder::new(); for e in elems { @@ -86,14 +76,14 @@ fn mk_globset(elems: clap::Values) -> GlobSet { } } -fn act_on_file(language: Option, path: PathBuf, cfg: &Config) -> std::io::Result<()> { +fn act_on_file(path: PathBuf, cfg: &Config) -> std::io::Result<()> { let source = if let Some(source) = read_file_with_eol(&path)? { source } else { return Ok(()); }; - let language = if let Some(language) = language { + let language = if let Some(language) = cfg.language { language } else if let Some(language) = guess_language(&source, &path).0 { language @@ -174,90 +164,18 @@ fn act_on_file(language: Option, path: PathBuf, cfg: &Config) -> std::io:: } } -fn consumer(receiver: JobReceiver) { - while let Ok(job) = receiver.recv() { - if job.is_none() { - break; - } - let job = job.unwrap(); - let path = job.path.clone(); - - if let Err(err) = act_on_file(job.language, job.path, &job.cfg) { - eprintln!("{:?} for file {:?}", err, path); - } - } -} - -fn send_file(path: PathBuf, cfg: &Arc, language: Option, sender: &JobSender) { - sender - .send(Some(JobItem { - language, - path, - cfg: Arc::clone(cfg), - })) - .unwrap(); -} - -fn is_hidden(entry: &DirEntry) -> bool { - entry - .file_name() - .to_str() - .map(|s| s.starts_with('.')) - .unwrap_or(false) -} - -fn explore( - mut paths: Vec, - cfg: &Arc, - include: GlobSet, - exclude: GlobSet, - language: Option, - sender: &JobSender, -) -> HashMap> { - let mut all_files: HashMap> = HashMap::new(); - - for path in paths.drain(..) { - let path = PathBuf::from(path); - if !path.exists() { - eprintln!("Warning: File doesn't exist: {}", path.to_str().unwrap()); - continue; - } - if path.is_dir() { - for entry in WalkDir::new(path) - .into_iter() - .filter_entry(|e| !is_hidden(e)) - { - let entry = entry.unwrap(); - let path = entry.path().to_path_buf(); - if (include.is_empty() || include.is_match(&path)) - && (exclude.is_empty() || !exclude.is_match(&path)) - && path.is_file() - { - if cfg.preproc_lock.is_some() { - let file_name = path.file_name().unwrap().to_str().unwrap().to_string(); - let path = path.clone(); - match all_files.entry(file_name) { - hash_map::Entry::Occupied(l) => { - l.into_mut().push(path); - } - hash_map::Entry::Vacant(p) => { - p.insert(vec![path]); - } - }; - } - - send_file(path, cfg, language, sender); - } +fn process_dir_path(all_files: &mut HashMap>, path: &Path, cfg: &Config) { + if cfg.preproc_lock.is_some() { + let file_name = path.file_name().unwrap().to_str().unwrap().to_string(); + match all_files.entry(file_name) { + hash_map::Entry::Occupied(l) => { + l.into_mut().push(path.to_path_buf()); } - } else if (include.is_empty() || include.is_match(&path)) - && (exclude.is_empty() || !exclude.is_match(&path)) - && path.is_file() - { - send_file(path, cfg, language, sender); - } + hash_map::Entry::Vacant(p) => { + p.insert(vec![path.to_path_buf()]); + } + }; } - - all_files } fn parse_or_exit(s: &str) -> T @@ -503,12 +421,16 @@ fn main() { None }; - let cfg = Arc::new(Config { + let include = mk_globset(matches.values_of("include").unwrap()); + let exclude = mk_globset(matches.values_of("exclude").unwrap()); + + let cfg = Config { dump, in_place, comments, find_filter, count_filter, + language, function, metrics, ops, @@ -520,51 +442,24 @@ fn main() { preproc_lock: preproc_lock.clone(), preproc, count_lock: count_lock.clone(), - }); - - let (sender, receiver) = unbounded(); - - let producer = { - let sender = sender.clone(); - let include = mk_globset(matches.values_of("include").unwrap()); - let exclude = mk_globset(matches.values_of("exclude").unwrap()); - - thread::Builder::new() - .name(String::from("Producer")) - .spawn(move || explore(paths, &cfg, include, exclude, language, &sender)) - .unwrap() }; - let mut receivers = Vec::with_capacity(num_jobs); - for i in 0..num_jobs { - let receiver = receiver.clone(); - - let t = thread::Builder::new() - .name(format!("Consumer {}", i)) - .spawn(move || { - consumer(receiver); - }) - .unwrap(); - - receivers.push(t); - } - - let all_files = if let Ok(res) = producer.join() { - res - } else { - process::exit(1); + let files_data = FilesData { + include, + exclude, + paths, }; - // Poison the receiver, now that the producer is finished. - for _ in 0..num_jobs { - sender.send(None).unwrap(); - } - - for receiver in receivers { - if receiver.join().is_err() { + let all_files = match ConcurrentRunner::new(num_jobs, act_on_file) + .set_proc_dir_paths(process_dir_path) + .run(cfg, files_data) + { + Ok(all_files) => all_files, + Err(e) => { + eprintln!("{:?}", e); process::exit(1); } - } + }; if let Some(count) = count_lock { let count = Arc::try_unwrap(count).unwrap().into_inner().unwrap();