The UFZ services GitLab and Mattermost will be unavailable on Monday, January 24 from 06:00 AM to 08:00 AM due to maintenance work.

Commit 48172a72 authored by Adam Reichold's avatar Adam Reichold
Browse files

Use Rayon to unify parallel reading and listing.

parent d3cc1c15
Pipeline #42388 passed with stage
in 1 minute and 24 seconds
......@@ -22,6 +22,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "autocfg"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]]
name = "bitflags"
version = "1.3.2"
......@@ -68,6 +74,30 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6455c0ca19f0d2fbf751b908d5c55c1f5cbc65e03c4225427254b46890bdde1e"
dependencies = [
"cfg-if",
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd"
dependencies = [
"cfg-if",
"crossbeam-utils",
"lazy_static",
"memoffset",
"scopeguard",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.5"
......@@ -78,6 +108,12 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "either"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
[[package]]
name = "filetime"
version = "0.2.15"
......@@ -126,6 +162,25 @@ version = "0.2.102"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2a5ac8f984bfcf3a823267e5fde638acc3325f6496633a5da6bb6eb2171e103"
[[package]]
name = "memoffset"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59accc507f1338036a0477ef61afdae33cde60840f4dfe481319ce3ad116ddf9"
dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "par-tar"
version = "0.1.0"
......@@ -133,10 +188,36 @@ dependencies = [
"clap",
"crossbeam-channel",
"glob",
"rayon",
"tar",
"zstd",
]
[[package]]
name = "rayon"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c06aca804d41dbc8ba42dfd964f0d01334eceb64314b9ecf7c5fad5188a06d90"
dependencies = [
"autocfg",
"crossbeam-deque",
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d78120e2c850279833f1dd3582f730c4ab53ed95aeaaaa862a2a5c71b1656d8e"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-utils",
"lazy_static",
"num_cpus",
]
[[package]]
name = "redox_syscall"
version = "0.2.10"
......@@ -146,6 +227,12 @@ dependencies = [
"bitflags",
]
[[package]]
name = "scopeguard"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "strsim"
version = "0.8.0"
......
......@@ -8,5 +8,6 @@ resolver = "2"
clap = "2.33"
crossbeam-channel = "0.5"
glob = "0.3"
rayon = "1.5"
tar = "0.4"
zstd = { version = "0.9", default-features = false, features = ["zstdmt", "thin"] }
use std::error::Error;
use std::fs::{read, File};
use std::path::PathBuf;
use std::thread::spawn;
use clap::{crate_authors, crate_name, crate_version, App, Arg};
use crossbeam_channel::{bounded, unbounded};
use crossbeam_channel::{bounded, Sender};
use glob::glob;
use rayon::{
iter::{IntoParallelIterator, ParallelBridge, ParallelIterator},
ThreadPoolBuilder,
};
use tar::{Builder, Header};
use zstd::Encoder;
......@@ -20,79 +25,69 @@ fn main() -> Fallible {
.long("jobs")
.default_value("1"),
)
.arg(
Arg::with_name("WORKERS")
.short("w")
.long("workers")
.default_value("1"),
)
.arg(
Arg::with_name("LEVEL")
.short("l")
.long("level")
.default_value("0"),
)
.arg(
Arg::with_name("WORKERS")
.short("w")
.long("workers")
.default_value("1"),
)
.get_matches();
let output = matches.value_of("OUTPUT").unwrap();
let inputs = matches.values_of("INPUTS").unwrap();
let inputs = matches
.values_of("INPUTS")
.unwrap()
.map(|inputs| inputs.to_owned())
.collect::<Vec<_>>();
let jobs = matches.value_of("JOBS").unwrap().parse::<usize>()?;
let workers = matches.value_of("WORKERS").unwrap().parse::<u32>()?;
let level = matches.value_of("LEVEL").unwrap().parse::<i32>()?;
let workers = matches.value_of("WORKERS").unwrap().parse::<u32>()?;
ThreadPoolBuilder::new().num_threads(jobs).build_global()?;
let (inputs_sender, inputs_receiver) = unbounded();
let (buffers_sender, buffers_receiver) = bounded(jobs);
let mut jobs = (0..jobs)
.map(move |_| {
let inputs_receiver = inputs_receiver.clone();
let buffers_sender = buffers_sender.clone();
fn read_dir(buffers_sender: &Sender<(PathBuf, Vec<u8>)>, dir: PathBuf) -> Fallible {
dir.read_dir()?.par_bridge().try_for_each(|entry| {
let entry = entry?;
let path = entry.path();
spawn(move || -> Fallible {
for input in inputs_receiver {
let buffer = read(&input)?;
if entry.file_type()?.is_dir() {
read_dir(buffers_sender, path)?;
} else {
let buffer = read(&path)?;
buffers_sender.send((input, buffer)).unwrap();
}
buffers_sender.send((path, buffer)).unwrap();
}
Ok(())
})
Ok(())
})
.collect::<Vec<_>>();
jobs.extend(inputs.map(move |inputs| {
let inputs = inputs.to_owned();
let inputs_sender = inputs_sender.clone();
spawn(move || {
let mut dirs = Vec::new();
}
for input in glob(&inputs)? {
let input = input?;
let reader = spawn(move || -> Fallible {
inputs.into_par_iter().try_for_each(|inputs| {
glob(&inputs)?.par_bridge().try_for_each(|input| {
let path = input?;
if input.is_dir() {
dirs.push(input);
if path.is_dir() {
read_dir(&buffers_sender, path)?;
} else {
inputs_sender.send(input).unwrap();
}
}
let buffer = read(&path)?;
while let Some(dir) = dirs.pop() {
for entry in dir.read_dir()? {
let entry = entry?;
let path = entry.path();
if entry.file_type()?.is_dir() {
dirs.push(path);
} else {
inputs_sender.send(path).unwrap();
}
buffers_sender.send((path, buffer)).unwrap();
}
}
Ok(())
Ok(())
})
})
}));
});
let mut encoder = Encoder::new(File::create(output)?, level)?;
encoder.multithread(workers)?;
......@@ -112,9 +107,7 @@ fn main() -> Fallible {
let encoder = builder.into_inner()?;
encoder.finish()?;
for job in jobs {
job.join().unwrap()?;
}
reader.join().unwrap()?;
Ok(())
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment