-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdirectory.rs
129 lines (119 loc) · 5.75 KB
/
directory.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// Code for handling directories
use rayon::prelude::*;
use std::{
fs,
marker::{Send, Sync},
path::{Path, PathBuf},
sync::Mutex,
};
use crate::BattleToolsError;
/// Anything that wants to parse logs should implement this
pub trait LogParser<R> {
/// Optional fast path for log handling. Passes the path instead of the JSON because some
/// usecases don't need the entire file.
/// Returns None if no decision could be made (need to run LogParser::handle_log_file),
/// Some(R) if a result was produced early.
/// Errors are funneled into None so that error handling code isn't duplicated (it will be
/// handled by the later handle_log_file call).
fn fast_handle_log_file(&self, _file_path: &Path) -> Option<R> {
None
}
/// Parses an individual log file's JSON
fn handle_log_file(&self, raw_json: String, file_path: &Path) -> Result<R, BattleToolsError>;
/// Parses the results from an entire directory.
/// Guaranteed to be only called once per invocation of ParallelDirectoryParser::handle_directory;
/// if subdirectories are found, their parse results will be combined together and passed to handle_results.
fn handle_results(&mut self, results: Vec<R>) -> Result<(), BattleToolsError>;
}
/// Iterates over directories and executes code on each log file in parallel.
pub trait ParallelDirectoryParser<R> {
/// `exclusion` will exclude any directory or file that matches the exclusion
fn handle_directories(
&mut self,
dirs: Vec<PathBuf>,
exclusion: Option<String>,
) -> Result<(), BattleToolsError>;
}
impl<T, R> ParallelDirectoryParser<R> for T
where
T: LogParser<R> + Sync + Send,
R: Send,
{
fn handle_directories(
&mut self,
dirs: Vec<PathBuf>,
exclusion: Option<String>,
) -> Result<(), BattleToolsError> {
// We don't know if we'll get a directory with lots of subdirectories or one with lots of JSON files,
// so we always use parallel iteration.
// We have a Mutex<Vec<PathBuf>> to store directories we need to check. We start with the provided directory,
// and add to this mutex whenever we find a subdirectory. This means that we can collect ALL the results into
// one Vec, and guarantee that handle_results is only called once. It's not too much of a performance to use a mutex,
// since in PS log structures, there are not generally JSON logs and subdirectories in the same directory.
let mut results: Vec<R> = vec![];
let subdirectories_mutex = Mutex::new(dirs);
let handle_specific_dir = |p: &PathBuf| -> Result<Vec<R>, BattleToolsError> {
eprintln!("Parsing {}...", p.display());
let result_vec = fs::read_dir(p)?
.collect::<Vec<_>>()
.par_iter()
.filter_map(|file| {
if let Ok(entry) = file.as_ref() {
if let Some(exclude) = &exclusion {
if entry.file_name().to_string_lossy().contains(exclude) {
return None;
}
}
if entry.file_type().ok()?.is_dir() {
// We found a subdirectory! Add it to the list of directories to process,
// then return None (there's no parsed data for a subdirectory!)
match subdirectories_mutex.lock() {
Ok(mut s) => s.push(entry.path()),
Err(e) => {
// We can't just propagate this error, since we're in a filter_map
// If the mutex is poisoned we can probably just panic, since the mutex
// will only be poisoned if another thread panics.
panic!(
"Mutex for list of directories to process poisoned (this is probably because of another error): {}",
e
);
}
};
return None;
}
let path = entry.path();
if let Some(res) = self.fast_handle_log_file(&path) {
return Some(res);
};
let raw_json = match fs::read_to_string(entry.path()) {
Ok(s) => s,
Err(e) => {
eprintln!("Error reading file {:?}: {:?}", path, e);
return None;
}
};
match self.handle_log_file(raw_json, &path) {
Ok(res) => Some(res),
Err(e) => {
eprintln!("Error parsing file {:?}: {:?}", path, e);
None
}
}
} else {
None
}
})
.collect();
Ok(result_vec)
};
// get_next_dir is a closure so that the mutex is unlocked before handle_specific_dir is called.
// Otherwise, the Rayon filter_map threads block infinitely when trying to lock the mutex (to add a subdirectory)
let get_next_dir = || subdirectories_mutex.lock().unwrap().pop();
while let Some(dir) = get_next_dir() {
if let Ok(r) = handle_specific_dir(&dir) {
results.extend(r);
}
}
self.handle_results(results)
}
}