1
- cat(
2
- " WTISEN Result Pre-processing" ,
3
- " ############################" ,
4
- sep = " \n " )
1
+ # #############
2
+ # Parameters #
3
+ # #############
5
4
6
- cat(" \n Working directory:" , getwd()," \n " )
5
+ library(optparse )
6
+
7
+ logger = function (... , sep = " " ){
8
+ cat(" \n " , format(Sys.time(), format = ' %Y-%m-%d %H:%M:%S' ), " " , ... , sep = sep )}
9
+
10
+ parser = OptionParser(
11
+ option_list = list (
12
+
13
+ make_option(
14
+ opt_str = c(" -i" , " --input" ),
15
+ help = " Input file, in CSV format." ,
16
+ type = " character" ,
17
+ default = " " ),
18
+
19
+ make_option(
20
+ opt_str = c(" -o" , " --output" ),
21
+ help = " Output file, in CSV format." ,
22
+ type = " character" ,
23
+ default = " " ),
24
+
25
+ make_option(
26
+ opt_str = c(" -v" , " --verbose" ),
27
+ help = " Print additional diagnostic information." ,
28
+ action = " store_true" ,
29
+ default = FALSE )
30
+ )
31
+ )
32
+
33
+ # Parse arguments
34
+ args = parse_args(parser )
35
+
36
+ # Verbose argument
37
+ if (args $ verbose ){
38
+ logger(" The following arguments have been passed to R:" ,
39
+ commandArgs(trailingOnly = TRUE ))
40
+ }
41
+
42
+ # ##################
43
+ # Data processing #
44
+ # ##################
7
45
8
46
# Disable package masking warnings for production
9
47
options(conflicts.policy = list (" warn" = F ))
10
48
11
- # Load libraries
12
49
library(readr )
13
50
library(tidyr )
14
51
library(dplyr )
15
52
library(stringr )
16
53
library(lubridate )
17
54
18
- # Script arguments
19
- cat(
20
- " \n\n This script requires 2 arguments:" ,
21
- " The path of the CSV export from PHO WTISEN" ,
22
- " The path of the processed parquet output" , sep = " \n -"
23
- )
24
-
25
- args = commandArgs(trailingOnly = T )
26
-
27
- cat(" \n Arguments detected:" , args , sep = " \n -" )
28
-
29
- wtisen_input = args [1 ]
30
- wtisen_output = args [2 ]
31
-
32
- # Extract top content of CSV for logging
33
- cat(" \n File info from PHO WTISEN:\n " )
34
- read_csv(
35
- file = wtisen_input ,
36
- skip = 1 ,
37
- n_max = 1 ,
38
- col_names = F ,
39
- col_select = 2 ,
40
- show_col_types = F ) | >
41
- pull(1 ) | >
42
- str_replace_all(c(
43
- " --" = " \n " ,
44
- " {2,}" = " " ,
45
- " \r " = " " ,
46
- " \n " = " \n " )) | >
47
- cat()
48
-
49
-
50
55
# Utility function
51
56
# Postal code cleaner
52
57
postalcode_check = function (x ){
@@ -108,37 +113,45 @@ postalcode_cleaner = function(x){
108
113
return (x )
109
114
}
110
115
111
- date_bounds = interval(as.POSIXct(" 2008-01-01" ), Sys.Date())
116
+ if (args $ input != " " ){
117
+ logger(" Reading unprocessed CSV file input from: " , args $ input )
118
+ } else {
119
+ logger(" No input file specified, ending script" )
120
+ stop(" No input file specified" )
121
+ }
112
122
113
123
# Extract CSV content
114
124
wtisen_data = read_csv(
115
- file = wtisen_input ,
116
- skip = 3 ,
117
- col_types = cols_only(
118
- DATE_Collected = col_datetime(format = " %m/%d/%Y %I:%M:%S %p" ),
119
- DATE_RECEIVED = col_datetime(format = " %m/%d/%Y %I:%M:%S %p" ),
120
- Barcode = col_character(),
121
- Laboratory = col_character(),
122
- Sub_Phone = col_character(),
123
- Sub_Alt_Phone = col_character(),
124
- Sub_First_Name2 = col_character(),
125
- Sub_Last_Name2 = col_character(),
126
- SRC_ADDRESS = col_character(),
127
- SRC_LOT_NUM = col_character(),
128
- SRC_CONCESSION = col_character(),
129
- SRC_CITY = col_character(),
130
- SRC_MUNICIPALITY = col_character(),
131
- SRC_COUNTY = col_character(),
132
- SRC_EMERGENCY_LOC_NO = col_character(),
133
- SRC_POSTAL = col_character(),
134
- ENTRY = col_integer(),
135
- FORMATTED_ENTRY = col_character(),
136
- TOTAL_COLIFORM = col_character(),
137
- E_COLI = col_character(),
138
- DATE_RELEASED = col_datetime(format = " %Y-%m-%d %H:%M:%S" ),
139
- DATE_REPORTED = col_datetime(format = " %Y-%m-%d %H:%M:%S" ),
140
- REQ_LEGIBLE = col_character())) | >
141
- rename_with(.fn = \(x ) str_remove_all(str_to_upper(x ), " ^SRC_|^SUB_|2$" )) | >
125
+ file = args $ input ,
126
+ guess_max = 0 ,
127
+ show_col_types = FALSE ) | >
128
+ rename_with(.fn = \(x ) {x | >
129
+ str_to_upper() | >
130
+ str_remove_all(" ^SRC_|^SUB_|2$" )}) | >
131
+ select(
132
+ BARCODE ,
133
+ DATE_COLLECTED ,
134
+ DATE_RECEIVED ,
135
+ DATE_RELEASED ,
136
+ DATE_REPORTED ,
137
+ LABORATORY ,
138
+ PHONE ,
139
+ ALT_PHONE ,
140
+ FIRST_NAME ,
141
+ LAST_NAME ,
142
+ ADDRESS ,
143
+ LOT_NUM ,
144
+ CONCESSION ,
145
+ CITY ,
146
+ MUNICIPALITY ,
147
+ COUNTY ,
148
+ EMERGENCY_LOC_NO ,
149
+ POSTAL ,
150
+ ENTRY ,
151
+ FORMATTED_ENTRY ,
152
+ TOTAL_COLIFORM ,
153
+ E_COLI ,
154
+ REQ_LEGIBLE ) | >
142
155
mutate(
143
156
across(
144
157
.cols = c(
@@ -148,23 +161,25 @@ wtisen_data = read_csv(
148
161
" COUNTY" ),
149
162
.fns = \(x ) str_replace(x , " _" , " " )),
150
163
across(
151
- .cols = where(is.character ),
152
- .fns = str_trim ),
164
+ .cols = starts_with(" DATE_" ),
165
+ .fns = \(x ) {x | >
166
+ as_datetime(format = c(" %m/%d/%Y %I:%M:%S %p" , " %Y-%m-%d %H:%M:%S" )) | >
167
+ force_tz(tz = " America/Toronto" )}),
153
168
across(
154
- .cols = starts_with(" DATE" ),
155
- .fns = \(x ) force_tz(x , tz = " America/Toronto" )),
156
- across(
157
- .cols = starts_with(" DATE" ),
158
- .fns = \(x ) if_else(x %within % date_bounds , x , NA_POSIXct_ )),
169
+ .cols = where(is.character ),
170
+ .fns = \(x ) str_trim(x )),
159
171
POSTAL = postalcode_cleaner(POSTAL ),
160
- REQ_LEGIBLE = str_detect( REQ_LEGIBLE , " ^y|Y$ " )) | >
161
- relocate( " BARCODE " , " REQ_LEGIBLE" , starts_with( " DATE " ))
172
+ ENTRY = as.integer( ENTRY ),
173
+ REQ_LEGIBLE = str_detect( REQ_LEGIBLE , " ^y|Y$ " ))
162
174
163
- cat(" \n Data loaded and processed" )
164
- cat(" \n Dimensions: " , dim(wtisen_data )[1 ], " x " , dim(wtisen_data )[2 ], " \n " , sep = " " )
165
- cat(" \n Fields:" , names(wtisen_data ), sep = " \n -" )
175
+ logger(" Data loaded and processed" )
176
+ logger(" Dimensions: " , dim(wtisen_data )[1 ], " x " , dim(wtisen_data )[2 ])
166
177
167
- arrow :: write_parquet(wtisen_data , wtisen_output )
168
- cat(" \n Pre-processed data output to: " , wtisen_output , sep = " " )
178
+ if (args $ output != " " ){
179
+ arrow :: write_parquet(wtisen_data , args $ output )
180
+ logger(" Processed data output in parquet format to: " , args $ output )
181
+ } else {
182
+ logger(" No output location specified, skipping data output" )
183
+ }
169
184
170
- cat( " \n\n Done !" )
185
+ logger( " Done !" )
0 commit comments