From e179f3c461dd9fa0a3a1f3122f053056dc703a78 Mon Sep 17 00:00:00 2001 From: Luke Sikina Date: Tue, 4 Feb 2025 14:28:58 -0500 Subject: [PATCH 1/2] [ALS-7113] Enhance sequential loader --- docker/pic-sure-hpds-etl/Dockerfile | 1 + etl/pom.xml | 28 +- .../data/genotype/util/RemapPatientIds.java | 17 - .../data/phenotype/util/DumpSourceCSV.java | 2 +- .../util/FixCategoricalConcepts.java | 2 +- .../data/phenotype/util/RekeyDataset.java | 2 +- .../hpds/data/phenotype/util/RemapIds.java | 2 +- .../data/phenotype/util/RenameCategories.java | 2 +- .../etl/{phenotype => }/LoadingStore.java | 2 +- .../hpds/etl/phenotype/SequentialLoader.java | 199 ------------ .../etl/phenotype/{ => csv}/CSVLoader.java | 3 +- .../{ => csv}/CSVLoaderNewSearch.java | 51 +-- .../hpds/etl/phenotype/csv/CSVParserUtil.java | 55 ++++ .../etl/phenotype/litecsv/IngestStatus.java | 6 + .../phenotype/litecsv/LowRAMCSVProcessor.java | 162 ++++++++++ .../phenotype/litecsv/LowRAMLoadingStore.java | 267 ++++++++++++++++ .../litecsv/LowRAMMultiCSVLoader.java | 64 ++++ .../{ => sequential}/PhenoRecord.java | 2 +- .../sequential/SequentialLoader.java | 290 ++++++++++++++++++ .../SequentialLoadingStore.java | 2 +- .../etl/phenotype/{ => sql}/SQLLoader.java | 9 +- .../litecsv/LowRAMCSVProcessorTest.java | 182 +++++++++++ .../litecsv/LowRAMMultiCSVLoaderTest.java | 48 +++ .../test/resources/test_named_encryption_key | 1 + .../util/BuildIntegrationTestEnvironment.java | 2 +- 25 files changed, 1122 insertions(+), 279 deletions(-) rename etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/{phenotype => }/LoadingStore.java (99%) delete mode 100644 etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/SequentialLoader.java rename etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/{ => csv}/CSVLoader.java (97%) rename etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/{ => csv}/CSVLoaderNewSearch.java (62%) create mode 100644 etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/csv/CSVParserUtil.java create mode 100644 etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/IngestStatus.java create mode 100644 etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMCSVProcessor.java create mode 100644 etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMLoadingStore.java create mode 100644 etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMMultiCSVLoader.java rename etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/{ => sequential}/PhenoRecord.java (97%) create mode 100644 etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/sequential/SequentialLoader.java rename etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/{ => sequential}/SequentialLoadingStore.java (99%) rename etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/{ => sql}/SQLLoader.java (96%) create mode 100644 etl/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMCSVProcessorTest.java create mode 100644 etl/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMMultiCSVLoaderTest.java create mode 100644 etl/src/test/resources/test_named_encryption_key diff --git a/docker/pic-sure-hpds-etl/Dockerfile b/docker/pic-sure-hpds-etl/Dockerfile index f160b120..661b98a1 100644 --- a/docker/pic-sure-hpds-etl/Dockerfile +++ b/docker/pic-sure-hpds-etl/Dockerfile @@ -28,6 +28,7 @@ COPY --from=build /app/docker/pic-sure-hpds-etl/RekeyDataset-jar-with-dependenci COPY --from=build /app/docker/pic-sure-hpds-etl/RemoveConceptFromMetadata-jar-with-dependencies.jar . COPY --from=build /app/docker/pic-sure-hpds-etl/HideAnnotationCategoryValue-jar-with-dependencies.jar . COPY --from=build /app/docker/pic-sure-hpds-etl/SequentialLoader-jar-with-dependencies.jar . +COPY --from=build /app/docker/pic-sure-hpds-etl/LowRAMMultiCSVLoader-jar-with-dependencies.jar . COPY --from=build /app/docker/pic-sure-hpds-etl/create_key.sh . ENTRYPOINT ["sh", "-c", "java $JAVA_OPTS -Xmx${HEAPSIZE:-2048}m -jar ${LOADER_NAME:-CSVLoader}-jar-with-dependencies.jar"] diff --git a/etl/pom.xml b/etl/pom.xml index c218d06a..64f087ca 100644 --- a/etl/pom.xml +++ b/etl/pom.xml @@ -135,7 +135,7 @@ - edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.CSVLoaderNewSearch + edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.csv.CSVLoaderNewSearch ${project.basedir}/../docker/pic-sure-hpds-etl @@ -155,7 +155,7 @@ - edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.CSVLoader + edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.csv.CSVLoader ${project.basedir}/../docker/pic-sure-hpds-etl @@ -175,7 +175,7 @@ - edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.SQLLoader + edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.sql.SQLLoader ${project.basedir}/../docker/pic-sure-hpds-etl @@ -195,7 +195,7 @@ - edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.SequentialLoader + edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.sequential.SequentialLoader ${project.basedir}/../docker/pic-sure-hpds-etl @@ -210,6 +210,26 @@ single + + buildLiteCSVLoader + + + + edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.litecsv.LowRAMMultiCSVLoader + + + ${project.basedir}/../docker/pic-sure-hpds-etl + + jar-with-dependencies + + LowRAMMultiCSVLoader + LowRAMMultiCSVLoader + + package + + single + + buildMultialleleCounter diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/util/RemapPatientIds.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/util/RemapPatientIds.java index 0bd73458..22605f5d 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/util/RemapPatientIds.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/util/RemapPatientIds.java @@ -1,39 +1,22 @@ package edu.harvard.hms.dbmi.avillach.hpds.data.genotype.util; -import java.io.ByteArrayInputStream; -import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.FileReader; -import java.io.FileWriter; import java.io.IOException; -import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -import java.io.RandomAccessFile; -import java.util.ArrayList; -import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; -import java.util.Map.Entry; -import java.util.concurrent.ExecutionException; -import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -import com.google.common.collect.ImmutableList; import de.siegmar.fastcsv.reader.CsvContainer; import de.siegmar.fastcsv.reader.CsvReader; import de.siegmar.fastcsv.reader.CsvRow; -import de.siegmar.fastcsv.writer.CsvWriter; -import edu.harvard.hms.dbmi.avillach.hpds.crypto.Crypto; import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantStore; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.ColumnMeta; -import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.KeyAndValue; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube; -import edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.LoadingStore; public class RemapPatientIds { protected static LoadingCache> store; diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/DumpSourceCSV.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/DumpSourceCSV.java index 585204d8..03d892d2 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/DumpSourceCSV.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/DumpSourceCSV.java @@ -21,7 +21,7 @@ import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.ColumnMeta; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.KeyAndValue; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube; -import edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.LoadingStore; +import edu.harvard.hms.dbmi.avillach.hpds.etl.LoadingStore; public class DumpSourceCSV { protected static LoadingCache> store; diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/FixCategoricalConcepts.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/FixCategoricalConcepts.java index 579d8a16..a651f397 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/FixCategoricalConcepts.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/FixCategoricalConcepts.java @@ -18,7 +18,7 @@ import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.ColumnMeta; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.KeyAndValue; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube; -import edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.LoadingStore; +import edu.harvard.hms.dbmi.avillach.hpds.etl.LoadingStore; @SuppressWarnings({"unchecked", "rawtypes"}) public class FixCategoricalConcepts { diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/RekeyDataset.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/RekeyDataset.java index 6d28bca8..75b35ce3 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/RekeyDataset.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/RekeyDataset.java @@ -18,7 +18,7 @@ import edu.harvard.hms.dbmi.avillach.hpds.crypto.Crypto; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.ColumnMeta; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube; -import edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.LoadingStore; +import edu.harvard.hms.dbmi.avillach.hpds.etl.LoadingStore; @SuppressWarnings({"unchecked"}) public class RekeyDataset { diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/RemapIds.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/RemapIds.java index e8cef09d..6733bc6d 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/RemapIds.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/RemapIds.java @@ -18,7 +18,7 @@ import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.ColumnMeta; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.KeyAndValue; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube; -import edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.LoadingStore; +import edu.harvard.hms.dbmi.avillach.hpds.etl.LoadingStore; @SuppressWarnings({"unchecked", "rawtypes"}) public class RemapIds { diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/RenameCategories.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/RenameCategories.java index 1711370b..268844e8 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/RenameCategories.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/RenameCategories.java @@ -20,7 +20,7 @@ import edu.harvard.hms.dbmi.avillach.hpds.crypto.Crypto; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.ColumnMeta; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube; -import edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.LoadingStore; +import edu.harvard.hms.dbmi.avillach.hpds.etl.LoadingStore; public class RenameCategories { diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/LoadingStore.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/LoadingStore.java similarity index 99% rename from etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/LoadingStore.java rename to etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/LoadingStore.java index f857936e..62b5b8e9 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/LoadingStore.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/LoadingStore.java @@ -1,4 +1,4 @@ -package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype; +package edu.harvard.hms.dbmi.avillach.hpds.etl; import java.io.*; import java.nio.file.Files; diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/SequentialLoader.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/SequentialLoader.java deleted file mode 100644 index a2e970e9..00000000 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/SequentialLoader.java +++ /dev/null @@ -1,199 +0,0 @@ -package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype; - -import java.io.*; -import java.nio.charset.Charset; -import java.nio.file.Files; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.*; -import java.util.concurrent.ExecutionException; - -import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVRecord; -import org.apache.commons.io.IOUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.jdbc.core.RowCallbackHandler; -import org.springframework.jdbc.datasource.DriverManagerDataSource; - -import com.google.common.cache.CacheLoader.InvalidCacheLoadException; - -import edu.harvard.hms.dbmi.avillach.hpds.crypto.Crypto; -import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube; - -/** - * Generates an HPDS data store "/opt/local/hpds/allObservationsStore.javabin" with all phenotype concepts from the provided input files. - * - * If no arguments are provided it will read a list of files from /opt/local/hpds/phenotypeInputs.txt, expecting one file per line. - * - * @author nchu - * - */ -@SuppressWarnings({"unchecked", "rawtypes"}) -public class SequentialLoader { - - private static final String INPUT_DIR = "/opt/local/hpds_input/"; - - private static final long LOG_INTERVAL = 1000000; - - private static SequentialLoadingStore store = new SequentialLoadingStore(); - - private static Logger log = LoggerFactory.getLogger(SequentialLoader.class); - - private static JdbcTemplate template = null; - - private static long processedRecords = 0; - - public static void main(String[] args) throws IOException, ClassNotFoundException { - - Crypto.loadDefaultKey(); - - List inputFiles = new ArrayList(); - //read in input files - if(args.length > 0) { - inputFiles.addAll(Arrays.asList(args)); - } else { - inputFiles.addAll(readFileList()); - } - - if(inputFiles.size() == 0) { - // check for prior default file locations - File file = new File("/opt/local/hpds/loadQuery.sql"); - if(file.isFile()) { - inputFiles.add("/opt/local/hpds/loadQuery.sql"); - } else { - file = new File("/opt/local/hpds/allConcepts.csv"); - if(file.isFile()) { - inputFiles.add("/opt/local/hpds/allConcepts.csv"); - } - } - } - - log.info("Input files: " + Arrays.deepToString(inputFiles.toArray())); - - //load each into observation store - for(String filename : inputFiles) { - log.info("Loading file " + filename); - if(filename.toLowerCase(Locale.ENGLISH).endsWith("sql")) { - loadSqlFile(filename); - } else if(filename.toLowerCase(Locale.ENGLISH).endsWith("csv")){ - loadCsvFile(filename); - } - } - - //then complete, which will compact, sort, and write out the data in the final place - log.info("found a total of " + processedRecords + " entries"); - store.saveStore(); - store.dumpStats(); - } - - private static List readFileList() throws IOException { - List inputFiles = new ArrayList(); - Files.list(new File(INPUT_DIR).toPath()) - .forEach(path -> { - inputFiles.add(INPUT_DIR + path.getFileName().toString()); - }); - return inputFiles; - } - - private static void loadCsvFile(String filename) throws IOException { - - Reader in = new FileReader(filename); - BufferedReader reader = new BufferedReader(in, 1024*1024); - Iterable records = CSVFormat.DEFAULT.withSkipHeaderRecord().withFirstRecordAsHeader().parse(reader); - - //currentConcept is used to persist data across function calls and identify when we hit a new concept - final PhenoCube[] currentConcept = new PhenoCube[1]; - for (CSVRecord record : records) { - if(record.size()<4) { - log.warn("Record number " + record.getRecordNumber() - + " had less records than we expected so we are skipping it."); - continue; - } - processRecord(currentConcept, new PhenoRecord(record)); - } - reader.close(); - in.close(); - } - - private static void loadSqlFile(String filename) throws FileNotFoundException, IOException { - loadTemplate(); - String loadQuery = IOUtils.toString(new FileInputStream(filename), Charset.forName("UTF-8")); - - final PhenoCube[] currentConcept = new PhenoCube[1]; //used to identify new concepts - template.query(loadQuery, new RowCallbackHandler() { - - @Override - public void processRow(ResultSet result) throws SQLException { - processRecord(currentConcept, new PhenoRecord(result)); - } - }); - } - - - private static void loadTemplate() throws FileNotFoundException, IOException { - if (template == null) { - Properties props = new Properties(); - props.load(new FileInputStream(INPUT_DIR + "sql.properties")); - template = new JdbcTemplate(new DriverManagerDataSource(props.getProperty("datasource.url"), props.getProperty("datasource.user"), - props.getProperty("datasource.password"))); - } - } - - private static void processRecord(final PhenoCube[] currentConcept, PhenoRecord record) { - if(record == null ) { - return; - } - - try { - String conceptPathFromRow = record.getConceptPath(); - String[] segments = conceptPathFromRow.split("\\\\"); - for(int x = 0;x 4 && record.get(DATETIME) != null && !record.get(DATETIME).isEmpty()) { - date = new Date(Long.parseLong(record.get(DATETIME))); + if (record.size() > 4 && record.get(CSVParserUtil.DATETIME) != null && !record.get(CSVParserUtil.DATETIME).isEmpty()) { + date = new Date(Long.parseLong(record.get(CSVParserUtil.DATETIME))); } currentConcept[0].add(patientId, isAlpha ? value : Double.parseDouble(value), date); store.allIds.add(patientId); @@ -91,28 +82,4 @@ private static PhenoCube getPhenoCube(PhenoCube currentConcept, String conceptPa return currentConcept; } - - private static String getSanitizedConceptPath(CSVRecord record) { - String conceptPathFromRow = record.get(CONCEPT_PATH); - String[] segments = conceptPathFromRow.split("\\\\"); - for (int x = 0; x < segments.length; x++) { - segments[x] = segments[x].trim(); - } - conceptPathFromRow = String.join("\\", segments) + "\\"; - conceptPathFromRow = conceptPathFromRow.replaceAll("\\ufffd", ""); - String textValueFromRow = record.get(TEXT_VALUE) == null ? null : record.get(TEXT_VALUE).trim(); - if (textValueFromRow != null) { - textValueFromRow = textValueFromRow.replaceAll("\\ufffd", ""); - } - String conceptPath; - - if (DO_VARNAME_ROLLUP) { - conceptPath = conceptPathFromRow.endsWith("\\" + textValueFromRow + "\\") ? conceptPathFromRow.replaceAll("\\\\[^\\\\]*\\\\$", "\\\\") : conceptPathFromRow; - } else { - conceptPath = conceptPathFromRow; - } - return conceptPath; - } - - } diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/csv/CSVParserUtil.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/csv/CSVParserUtil.java new file mode 100644 index 00000000..9f5181a7 --- /dev/null +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/csv/CSVParserUtil.java @@ -0,0 +1,55 @@ +package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.csv; + +import org.apache.commons.csv.CSVRecord; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.Date; +import java.util.stream.Collectors; + +public class CSVParserUtil { + public static final int PATIENT_NUM = 0; + public static final int CONCEPT_PATH = 1; + public static final int NUMERIC_VALUE = 2; + public static final int TEXT_VALUE = 3; + public static final int DATETIME = 4; + + public static String parseConceptPath(CSVRecord record, boolean doVarNameRollup) { + String conceptPathFromRow = record.get(CONCEPT_PATH); + conceptPathFromRow = Arrays.stream(conceptPathFromRow.split("\\\\")) + .map(String::trim) + .collect(Collectors.joining("\\")) + "\\"; + conceptPathFromRow = stripWeirdUnicodeChars(conceptPathFromRow); + + // \\ufffd = � + String textValueFromRow = stripWeirdUnicodeChars(trim(record.get(TEXT_VALUE))); + if (doVarNameRollup && conceptPathFromRow.endsWith("\\" + textValueFromRow + "\\")) { + // This regex deletes the last node from the concept path, i.e. "rolling it up" + return conceptPathFromRow.replaceAll("\\\\[^\\\\]*\\\\$", "\\\\"); + } else { + return conceptPathFromRow; + } + } + + private static String stripWeirdUnicodeChars(@Nonnull String raw) { + return raw.replaceAll("\\ufffd", ""); + } + + public static String trim(@Nullable String maybeString) { + return maybeString == null ? "" : maybeString.trim(); + } + + public static @Nullable Date parseDate(CSVRecord record) { + Date date = null; + try { + if (record.size() > 4 && record.get(DATETIME) != null && !record.get(DATETIME).isEmpty()) { + date = new Date(Long.parseLong(record.get(DATETIME))); + } + } catch (NumberFormatException e) { + return null; + } + + return date; + } +} diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/IngestStatus.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/IngestStatus.java new file mode 100644 index 00000000..33748767 --- /dev/null +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/IngestStatus.java @@ -0,0 +1,6 @@ +package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.litecsv; + +import java.nio.file.Path; + +public record IngestStatus(Path file, long lineCount, int conceptCount, long duration) { +} diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMCSVProcessor.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMCSVProcessor.java new file mode 100644 index 00000000..e0674e1b --- /dev/null +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMCSVProcessor.java @@ -0,0 +1,162 @@ +package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.litecsv; + +import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube; +import edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.csv.CSVParserUtil; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.apache.logging.log4j.util.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.*; +import java.nio.file.Files; +import java.util.*; +import java.util.stream.Stream; + +public class LowRAMCSVProcessor { + + private static final Logger log = LoggerFactory.getLogger(LowRAMCSVProcessor.class); + private final LowRAMLoadingStore store; + private final boolean doVarNameRollup; + private final double maxChunkSizeGigs; + + public LowRAMCSVProcessor(LowRAMLoadingStore store, boolean doVarNameRollup, double maxChunkSizeGigs) { + this.store = store; + this.doVarNameRollup = doVarNameRollup; + this.maxChunkSizeGigs = maxChunkSizeGigs; + } + + public IngestStatus process(File csv) { + long startTime = System.nanoTime(); + log.info("Attempting to ingest file {}", csv.getAbsolutePath()); + try (Reader r = new FileReader(csv); Stream rawLines = Files.lines(csv.toPath())) { + CSVParser parser = CSVFormat.DEFAULT.withSkipHeaderRecord().withFirstRecordAsHeader().parse(new BufferedReader(r)); + + // we want to read the file in reasonably sized chunks so that we can handle chunks naively + // in memory without going OOM + // to do this, we're going to assume that over the course of thousands of lines, each line + // is more or less the same length + // so we'll just provision n chunks, where n is max(1, file_size/max_chunk_size) + log.info("Gathering stats about file {}", csv.getName()); + int chunks = Math.max(1, (int) Math.ceil((double) csv.length() / (maxChunkSizeGigs * 1024 * 1024 * 1024))); + final long totalLineCount = rawLines.count(); + final long linesPerChunk = totalLineCount / chunks; + log.info( + "File {} is {} bytes and {} lines. Dividing into {} chunks of {} lines each", csv.getName(), csv.length(), totalLineCount, + chunks, linesPerChunk + ); + long chunkLineCount = 0; + long lineCount = 0; + int chunkCount = 0; + Set concepts = new HashSet<>(); + List lines = new ArrayList<>(); + + log.info("Creating chunks"); + for (CSVRecord record : parser) { + chunkLineCount++; + lineCount++; + lines.add(record); + if (chunkLineCount > linesPerChunk || lineCount + 1 == totalLineCount) { + log.info("Finished creating chunk {}", chunkCount); + // sort by concept to prevent cache thrashing when ingesting + // loading each concept in its entirety for a chunk will minimize disk IO and + // let us keep more valuable things in RAM + lines.sort(Comparator.comparing(a -> a.get(1))); + log.info("Finished sorting chunk {}", chunkCount); + Set chunkConcepts = ingest(lines); + concepts.addAll(chunkConcepts); + log.info("Finished ingesting chunk {} with {} unique concepts", chunkCount, chunkConcepts.size()); + lines = new ArrayList<>(); + chunkLineCount = 0; + chunkCount++; + } + } + + return new IngestStatus(csv.toPath(), totalLineCount, concepts.size(), System.nanoTime() - startTime); + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + + private Set ingest(List sortedRecords) { + Set concepts = new HashSet<>(); + for (CSVRecord record : sortedRecords) { + if (record.size() < 4) { + log.warn("Record #{} has too few columns, skipping.", record.getRecordNumber()); + continue; + } + + String conceptPath = CSVParserUtil.parseConceptPath(record, doVarNameRollup); + // TODO: check logic + IngestFn ingestFn = Strings.isEmpty(record.get(CSVParserUtil.NUMERIC_VALUE)) ? this::ingestNonNumeric : this::ingestNumeric; + Date date = CSVParserUtil.parseDate(record); + int patientId = Integer.parseInt(record.get(CSVParserUtil.PATIENT_NUM)); + if (ingestFn.attemptIngest(record, conceptPath, patientId, date)) { + concepts.add(conceptPath); + } else { + log.warn("Could not ingest record #{}", record.getRecordNumber()); + } + } + return concepts; + } + + @FunctionalInterface + private interface IngestFn { + boolean attemptIngest(CSVRecord record, String path, int patientId, @Nullable Date date); + } + + private boolean ingestNumeric(CSVRecord record, String conceptPath, int patientId, Date date) { + PhenoCube concept = store.loadingCache.getIfPresent(conceptPath); + if (concept == null) { + concept = new PhenoCube<>(conceptPath, Double.class); + concept.setColumnWidth(Double.BYTES); + store.loadingCache.put(conceptPath, concept); + } + if (!concept.vType.equals(Double.class)) { + log.error(""" + Concept bucket {} was configured for non-numeric types, but received numeric value {} + This happens when, for a single concept, there rows that have a tval_char, and other + rows with an nval_num but no tval_char. + Offending record #{} + """, conceptPath, record.get(CSVParserUtil.NUMERIC_VALUE), record.getRecordNumber()); + return false; + } + try { + String rawNumericValue = CSVParserUtil.trim(record.get(CSVParserUtil.NUMERIC_VALUE)); + double parsedValue = Double.parseDouble(rawNumericValue); + concept.add(patientId, parsedValue, date); + return true; + } catch (NumberFormatException e) { + log.warn("Could not parse numeric value in line {}", record); + } + return false; + } + + private boolean ingestNonNumeric(CSVRecord record, String conceptPath, int patientId, Date date) { + PhenoCube concept = store.loadingCache.getIfPresent(conceptPath); + if (concept == null) { + concept = new PhenoCube<>(conceptPath, String.class); + store.loadingCache.put(conceptPath, concept); + } + if (!concept.vType.equals(String.class)) { + log.error(""" + Concept bucket {} was configured for numeric types, but received non-numeric value {} + This happens when, for a single concept, there rows that have a tval_char, and other + rows with an nval_num but no tval_char. + Offending record #{} + """, conceptPath, record.get(CSVParserUtil.TEXT_VALUE), record.getRecordNumber()); + return false; + } + String rawTextValue = CSVParserUtil.trim(record.get(CSVParserUtil.TEXT_VALUE)); + if (rawTextValue.isEmpty()) { + return false; + } + concept.setColumnWidth(Math.max(rawTextValue.getBytes().length, concept.getColumnWidth())); + concept.add(patientId, rawTextValue, date); + return true; + } +} diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMLoadingStore.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMLoadingStore.java new file mode 100644 index 00000000..80cd0f77 --- /dev/null +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMLoadingStore.java @@ -0,0 +1,267 @@ +package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.litecsv; + +import com.google.common.cache.*; +import edu.harvard.hms.dbmi.avillach.hpds.crypto.Crypto; +import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.ColumnMeta; +import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.KeyAndValue; +import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVPrinter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import static edu.harvard.hms.dbmi.avillach.hpds.crypto.Crypto.DEFAULT_KEY_NAME; +import static java.nio.file.StandardOpenOption.*; + +/** + * This class provides similar functioanlity to the LoadingStore class, but is designed with sequential loading in mind. + * + * This will write out partial cubes to individual files instead of assuming that they can immediately be sequenced into the + * allObservationStore; This will allow us to pick them back up and add more patients to existing stores so that we can better handle + * fragmented data. + * + * + * @author nchu + * + */ +public class LowRAMLoadingStore { + + private final String columnmetaFilename; + protected final String observationsFilename; + protected final String obsTempFilename; + private final String encryptionKeyName; + + private final RandomAccessFile allObservationsTemp; + + TreeMap metadataMap = new TreeMap<>(); + + private static Logger log = LoggerFactory.getLogger(LowRAMLoadingStore.class); + + public LowRAMLoadingStore() { + this( + "/opt/local/hpds/allObservationsTemp.javabin", "/opt/local/hpds/columnMeta.javabin", + "/opt/local/hpds/allObservationsStore.javabin", DEFAULT_KEY_NAME + ); + } + + public LowRAMLoadingStore( + String observationsTempFile, String columnMetaTempFile, String observationsPermFile, String encryptionKeyName + ) { + obsTempFilename = observationsTempFile; + columnmetaFilename = columnMetaTempFile; + observationsFilename = observationsPermFile; + this.encryptionKeyName = encryptionKeyName; + try { + allObservationsTemp = new RandomAccessFile(obsTempFilename, "rw"); + } catch (FileNotFoundException e) { + throw new UncheckedIOException(e); + } + } + + public LoadingCache loadingCache = + CacheBuilder.newBuilder().maximumSize(1).removalListener(new RemovalListener() { + + @Override + public void onRemoval(RemovalNotification cubeRemoval) { + if (cubeRemoval.getValue().getLoadingMap() != null) { + try ( + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); ObjectOutputStream out = + new ObjectOutputStream(byteStream) + ) { + ColumnMeta columnMeta = new ColumnMeta().setName(cubeRemoval.getKey()) + .setWidthInBytes(cubeRemoval.getValue().getColumnWidth()).setCategorical(cubeRemoval.getValue().isStringType()); + columnMeta.setAllObservationsOffset(allObservationsTemp.getFilePointer()); + // write out the basic key/value map for loading; this will be compacted and finalized after all concepts are read + // in. + out.writeObject(cubeRemoval.getValue().getLoadingMap()); + out.flush(); + + allObservationsTemp.write(byteStream.toByteArray()); + columnMeta.setAllObservationsLength(allObservationsTemp.getFilePointer()); + metadataMap.put(columnMeta.getName(), columnMeta); + } catch (IOException e1) { + throw new UncheckedIOException(e1); + } + } + } + }).build(new CacheLoader<>() { + public PhenoCube load(String key) throws Exception { + ColumnMeta columnMeta = metadataMap.get(key); + if (columnMeta != null) { + log.debug("Loading concept : [" + key + "]"); + return getCubeFromTemp(columnMeta); + } else { + return null; + } + } + }); + + public TreeSet allIds = new TreeSet(); + + public void saveStore() throws FileNotFoundException, IOException, ClassNotFoundException { + log.info("flushing temp storage"); + loadingCache.invalidateAll(); + loadingCache.cleanUp(); + + RandomAccessFile allObservationsStore = new RandomAccessFile(observationsFilename, "rw"); + // we dumped it all in a temp file; now sort all the data and compress it into the real Store + for (String concept : metadataMap.keySet()) { + ColumnMeta columnMeta = metadataMap.get(concept); + log.debug("Writing concept : [{}]", concept); + PhenoCube cube = getCubeFromTemp(columnMeta); + complete(columnMeta, cube); + write(allObservationsStore, columnMeta, cube); + } + allObservationsStore.close(); + + log.info("Writing metadata"); + ObjectOutputStream metaOut = new ObjectOutputStream(new GZIPOutputStream(new FileOutputStream(new File(columnmetaFilename)))); + metaOut.writeObject(metadataMap); + metaOut.writeObject(allIds); + metaOut.flush(); + metaOut.close(); + + log.info("Cleaning up temporary file"); + + allObservationsTemp.close(); + File tempFile = new File(obsTempFilename); + tempFile.delete(); + dumpStatsAndColumnMeta("/opt/local/hpds/"); + } + + private void write(RandomAccessFile allObservationsStore, ColumnMeta columnMeta, PhenoCube cube) throws IOException { + columnMeta.setAllObservationsOffset(allObservationsStore.getFilePointer()); + + try (ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); ObjectOutputStream out = new ObjectOutputStream(byteStream);) { + + out.writeObject(cube); + out.flush(); + allObservationsStore.write(Crypto.encryptData(encryptionKeyName, byteStream.toByteArray())); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + columnMeta.setAllObservationsLength(allObservationsStore.getFilePointer()); + } + + private PhenoCube getCubeFromTemp(ColumnMeta columnMeta) throws IOException, ClassNotFoundException { + allObservationsTemp.seek(columnMeta.getAllObservationsOffset()); + int length = (int) (columnMeta.getAllObservationsLength() - columnMeta.getAllObservationsOffset()); + byte[] buffer = new byte[length]; + allObservationsTemp.read(buffer); + allObservationsTemp.seek(allObservationsTemp.length()); + ObjectInputStream inStream = new ObjectInputStream(new ByteArrayInputStream(buffer)); + + PhenoCube cube = new PhenoCube(columnMeta.getName(), columnMeta.isCategorical() ? String.class : Double.class); + cube.setLoadingMap((List) inStream.readObject()); + cube.setColumnWidth(columnMeta.getWidthInBytes()); + inStream.close(); + return cube; + } + + private > void complete(ColumnMeta columnMeta, PhenoCube cube) { + ArrayList> entryList = new ArrayList>(cube.getLoadingMap().stream().map((entry) -> { + return new KeyAndValue(entry.getKey(), entry.getValue(), entry.getTimestamp()); + }).collect(Collectors.toList())); + + List> sortedByKey = + entryList.stream().sorted(Comparator.comparing(KeyAndValue::getKey)).collect(Collectors.toList()); + cube.setSortedByKey(sortedByKey.toArray(new KeyAndValue[0])); + + if (cube.isStringType()) { + TreeMap> categoryMap = new TreeMap<>(); + for (KeyAndValue entry : cube.sortedByValue()) { + if (!categoryMap.containsKey(entry.getValue())) { + categoryMap.put(entry.getValue(), new LinkedList()); + } + categoryMap.get(entry.getValue()).add(entry.getKey()); + } + TreeMap> categorySetMap = new TreeMap<>(); + categoryMap.entrySet().stream().forEach((entry) -> { + categorySetMap.put(entry.getKey(), new TreeSet(entry.getValue())); + }); + cube.setCategoryMap(categorySetMap); + } + + columnMeta.setObservationCount(cube.sortedByKey().length); + columnMeta.setPatientCount(Arrays.stream(cube.sortedByKey()).map((kv) -> { + return kv.getKey(); + }).collect(Collectors.toSet()).size()); + if (columnMeta.isCategorical()) { + columnMeta.setCategoryValues(new ArrayList(new TreeSet((List) cube.keyBasedArray()))); + } else { + List map = cube.keyBasedArray().stream().map((value) -> { + return (Double) value; + }).collect(Collectors.toList()); + double min = Double.MAX_VALUE; + double max = Double.MIN_VALUE; + for (double f : map) { + min = Double.min(min, f); + max = Double.max(max, f); + } + columnMeta.setMin(min); + columnMeta.setMax(max); + } + + } + + public void dumpStatsAndColumnMeta(String hpdsDirectory) { + try ( + FileInputStream fIn = new FileInputStream(hpdsDirectory + "columnMeta.javabin"); GZIPInputStream gIn = + new GZIPInputStream(fIn); ObjectInputStream oIn = new ObjectInputStream(gIn); BufferedWriter csvWriter = + Files.newBufferedWriter(Paths.get(hpdsDirectory + "columnMeta.csv"), CREATE, TRUNCATE_EXISTING) + ) { + TreeMap metastore = (TreeMap) oIn.readObject(); + CSVPrinter printer = new CSVPrinter(csvWriter, CSVFormat.DEFAULT); + for (String key : metastore.keySet()) { + String[] columnMetaOut = createRow(key, metastore); + printer.printRecord(columnMetaOut); + } + csvWriter.flush(); + } catch (IOException | ClassNotFoundException e) { + log.error("Error loading store or dumping store meta to CSV: ", e); + } + } + + private static String[] createRow(String key, TreeMap metastore) { + ColumnMeta columnMeta = metastore.get(key); + String[] columnMetaOut = new String[11]; + + StringBuilder listQuoted = new StringBuilder(); + AtomicInteger x = new AtomicInteger(1); + + if (columnMeta.getCategoryValues() != null) { + if (!columnMeta.getCategoryValues().isEmpty()) { + columnMeta.getCategoryValues().forEach(string -> { + listQuoted.append(string); + if (x.get() != columnMeta.getCategoryValues().size()) listQuoted.append("µ"); + x.incrementAndGet(); + }); + } + } + + columnMetaOut[0] = columnMeta.getName(); + columnMetaOut[1] = String.valueOf(columnMeta.getWidthInBytes()); + columnMetaOut[2] = String.valueOf(columnMeta.getColumnOffset()); + columnMetaOut[3] = String.valueOf(columnMeta.isCategorical()); + // this should nest the list of values in a list inside the String array. + columnMetaOut[4] = listQuoted.toString(); + columnMetaOut[5] = String.valueOf(columnMeta.getMin()); + columnMetaOut[6] = String.valueOf(columnMeta.getMax()); + columnMetaOut[7] = String.valueOf(columnMeta.getAllObservationsOffset()); + columnMetaOut[8] = String.valueOf(columnMeta.getAllObservationsLength()); + columnMetaOut[9] = String.valueOf(columnMeta.getObservationCount()); + columnMetaOut[10] = String.valueOf(columnMeta.getPatientCount()); + return columnMetaOut; + } + + +} diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMMultiCSVLoader.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMMultiCSVLoader.java new file mode 100644 index 00000000..81b07450 --- /dev/null +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMMultiCSVLoader.java @@ -0,0 +1,64 @@ +package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.litecsv; + +import edu.harvard.hms.dbmi.avillach.hpds.crypto.Crypto; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.stream.Stream; + +public class LowRAMMultiCSVLoader { + + private static final Logger log = LoggerFactory.getLogger(LowRAMMultiCSVLoader.class); + private final LowRAMCSVProcessor processor; + private final String inputDir; + + public LowRAMMultiCSVLoader(LowRAMLoadingStore store, LowRAMCSVProcessor processor, String inputDir) { + this.inputDir = inputDir == null ? "/opt/local/hpds_input" : inputDir; + store = store == null ? new LowRAMLoadingStore() : store; + this.processor = processor == null ? new LowRAMCSVProcessor(store, false, 5D) : processor; + Crypto.loadDefaultKey(); + } + + public static void main(String[] args) { + boolean rollUpVarNames = true; + if (args.length > 1) { + if (args[0].equalsIgnoreCase("NO_ROLLUP")) { + log.info("Configured to not roll up variable names"); + rollUpVarNames = false; + } + } + String inputDir = "/opt/local/hpds_input"; + LowRAMLoadingStore store = new LowRAMLoadingStore(); + LowRAMCSVProcessor lowRAMCSVProcessor = new LowRAMCSVProcessor(store, rollUpVarNames, 5D); + int exitCode = new LowRAMMultiCSVLoader(store, lowRAMCSVProcessor, inputDir).processCSVsFromHPDSDir(); + try { + store.saveStore(); + } catch (IOException | ClassNotFoundException e) { + log.error("Error saving store: ", e); + System.exit(1); + } + System.exit(exitCode); + } + + protected int processCSVsFromHPDSDir() { + // find all files + log.info("Looking for files to process. All files must be smaller than 5G"); + log.info("Files larger than 5G should be split into a series of CSVs"); + try (Stream input_files = Files.list(Path.of(inputDir))) { + input_files.map(Path::toFile).filter(File::isFile).peek(f -> log.info("Found file {}", f.getAbsolutePath())) + + .filter(f -> f.getName().endsWith(".csv")).peek(f -> log.info("Confirmed file {} is a .csv", f.getAbsolutePath())) + + .map(processor::process).forEach(status -> log.info("Finished processing file {}", status)); + return 0; + } catch (IOException e) { + log.error("Exception processing files: ", e); + return 1; + } + } + + +} diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/PhenoRecord.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/sequential/PhenoRecord.java similarity index 97% rename from etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/PhenoRecord.java rename to etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/sequential/PhenoRecord.java index 9380a803..ad89dae5 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/PhenoRecord.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/sequential/PhenoRecord.java @@ -1,4 +1,4 @@ -package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype; +package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.sequential; import java.sql.ResultSet; import java.sql.SQLException; diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/sequential/SequentialLoader.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/sequential/SequentialLoader.java new file mode 100644 index 00000000..5b216871 --- /dev/null +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/sequential/SequentialLoader.java @@ -0,0 +1,290 @@ +package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.sequential; + +import java.io.*; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.*; +import java.util.concurrent.ExecutionException; + +import edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.litecsv.LowRAMMultiCSVLoader; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVRecord; +import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.RowCallbackHandler; +import org.springframework.jdbc.datasource.DriverManagerDataSource; + +import com.google.common.cache.CacheLoader.InvalidCacheLoadException; + +import edu.harvard.hms.dbmi.avillach.hpds.crypto.Crypto; +import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube; + +/** + * Generates an HPDS data store "/opt/local/hpds/allObservationsStore.javabin" with all phenotype concepts from the provided input files. + * + * If no arguments are provided it will read a list of files from /opt/local/hpds/phenotypeInputs.txt, expecting one file per line. + * + * @author nchu + * + */ +@SuppressWarnings({"unchecked", "rawtypes"}) +public class SequentialLoader { + + private static final String INPUT_DIR = "/opt/local/hpds_input/"; + + private static final long LOG_INTERVAL = 1000000; + + private static SequentialLoadingStore store = new SequentialLoadingStore(); + + private static Logger log = LoggerFactory.getLogger(SequentialLoader.class); + + private static JdbcTemplate template = null; + + private static long processedRecords = 0; + + private static final int PATIENT_NUM = 0; + + private static final int CONCEPT_PATH = 1; + + private static final int NUMERIC_VALUE = 2; + + private static final int TEXT_VALUE = 3; + + private static final int DATETIME = 4; + + private static String HPDS_DIRECTORY = "/opt/local/hpds/"; + + public static void main(String[] args) throws IOException, ClassNotFoundException { + + Crypto.loadDefaultKey(); + + List inputFiles = new ArrayList(); + // read in input files + if (args.length > 0) { + inputFiles.addAll(Arrays.asList(args)); + } else { + inputFiles.addAll(readFileList()); + } + + if (inputFiles.size() == 0) { + // check for prior default file locations + File file = new File("/opt/local/hpds/loadQuery.sql"); + if (file.isFile()) { + inputFiles.add("/opt/local/hpds/loadQuery.sql"); + } else { + file = new File("/opt/local/hpds/allConcepts.csv"); + if (file.isFile()) { + inputFiles.add("/opt/local/hpds/allConcepts.csv"); + } + } + } + + log.info("Input files: " + Arrays.deepToString(inputFiles.toArray())); + + if (inputFiles.stream().allMatch(f -> f.endsWith(".csv"))) { + LowRAMMultiCSVLoader.main(args); + } + + // load each into observation store + for (String filename : inputFiles) { + log.info("Loading file " + filename); + if (filename.toLowerCase(Locale.ENGLISH).endsWith("sql")) { + loadSqlFile(filename); + } else if (filename.toLowerCase(Locale.ENGLISH).endsWith("csv")) { + loadCsvFile(filename); + } + } + + // then complete, which will compact, sort, and write out the data in the final place + log.info("found a total of " + processedRecords + " entries"); + store.saveStore(); + store.dumpStats(); + } + + private static List readFileList() throws IOException { + List inputFiles = new ArrayList(); + Files.list(new File(INPUT_DIR).toPath()).forEach(path -> { + inputFiles.add(INPUT_DIR + path.getFileName().toString()); + }); + return inputFiles; + } + + private static void loadCsvFile(String filename) throws IOException { + + Reader in = new FileReader(filename); + BufferedReader reader = new BufferedReader(in, 1024 * 1024); + Iterable records = CSVFormat.DEFAULT.withSkipHeaderRecord().withFirstRecordAsHeader().parse(reader); + + // currentConcept is used to persist data across function calls and identify when we hit a new concept + final PhenoCube[] currentConcept = new PhenoCube[1]; + int count = 0; + for (CSVRecord record : records) { + count++; + if (count % 5000 == 0) { + log.info("Processed {} records for file {}", count, filename); + } + if (record.size() < 4) { + log.warn("Record number " + record.getRecordNumber() + " had less records than we expected so we are skipping it."); + continue; + } + processRecord(currentConcept, record); + } + reader.close(); + in.close(); + } + + private static void loadSqlFile(String filename) throws FileNotFoundException, IOException { + loadTemplate(); + String loadQuery = IOUtils.toString(new FileInputStream(filename), Charset.forName("UTF-8")); + + final PhenoCube[] currentConcept = new PhenoCube[1]; // used to identify new concepts + template.query(loadQuery, new RowCallbackHandler() { + + @Override + public void processRow(ResultSet result) throws SQLException { + processRecord(currentConcept, new PhenoRecord(result)); + } + }); + } + + + private static void loadTemplate() throws FileNotFoundException, IOException { + if (template == null) { + Properties props = new Properties(); + props.load(new FileInputStream(INPUT_DIR + "sql.properties")); + template = new JdbcTemplate( + new DriverManagerDataSource( + props.getProperty("datasource.url"), props.getProperty("datasource.user"), props.getProperty("datasource.password") + ) + ); + } + } + + private static void processRecord(final PhenoCube[] currentConcept, CSVRecord record) { + if (record.size() < 4) { + log.info("Record number " + record.getRecordNumber() + " had less records than we expected so we are skipping it."); + return; + } + + try { + String conceptPathFromRow = record.get(CONCEPT_PATH); + String[] segments = conceptPathFromRow.split("\\\\"); + for (int x = 0; x < segments.length; x++) { + segments[x] = segments[x].trim(); + } + conceptPathFromRow = String.join("\\", segments) + "\\"; + conceptPathFromRow = conceptPathFromRow.replaceAll("\\ufffd", ""); + String textValueFromRow = record.get(TEXT_VALUE) == null ? null : record.get(TEXT_VALUE).trim(); + if (textValueFromRow != null) { + textValueFromRow = textValueFromRow.replaceAll("\\ufffd", ""); + } + String conceptPath = + conceptPathFromRow.endsWith("\\" + textValueFromRow + "\\") ? conceptPathFromRow.replaceAll("\\\\[^\\\\]*\\\\$", "\\\\") + : conceptPathFromRow; + // This is not getDouble because we need to handle null values, not coerce them into 0s + String numericValue = record.get(NUMERIC_VALUE); + if ((numericValue == null || numericValue.isEmpty()) && textValueFromRow != null) { + try { + numericValue = Double.parseDouble(textValueFromRow) + ""; + } catch (NumberFormatException e) { + + } + } + boolean isAlpha = (numericValue == null || numericValue.isEmpty()); + if (currentConcept[0] == null || !currentConcept[0].name.equals(conceptPath)) { + try { + currentConcept[0] = store.loadingCache.get(conceptPath); + } catch (InvalidCacheLoadException e) { + currentConcept[0] = new PhenoCube(conceptPath, isAlpha ? String.class : Double.class); + store.loadingCache.put(conceptPath, currentConcept[0]); + } + } + String value = isAlpha ? record.get(TEXT_VALUE) : numericValue; + + if ( + value != null && !value.trim().isEmpty() + && ((isAlpha && currentConcept[0].vType == String.class) || (!isAlpha && currentConcept[0].vType == Double.class)) + ) { + value = value.trim(); + currentConcept[0] + .setColumnWidth(isAlpha ? Math.max(currentConcept[0].getColumnWidth(), value.getBytes().length) : Double.BYTES); + int patientId = Integer.parseInt(record.get(PATIENT_NUM)); + Date date = null; + if (record.size() > 4 && record.get(DATETIME) != null && !record.get(DATETIME).isEmpty()) { + date = new Date(Long.parseLong(record.get(DATETIME))); + } + currentConcept[0].add(patientId, isAlpha ? value : Double.parseDouble(value), date); + store.allIds.add(patientId); + } + } catch (ExecutionException e) { + // todo: do we really want to ignore this? + log.error("Error processing record", e); + } + } + + private static void processRecord(final PhenoCube[] currentConcept, PhenoRecord record) { + if (record == null) { + return; + } + + try { + String conceptPathFromRow = record.getConceptPath(); + String[] segments = conceptPathFromRow.split("\\\\"); + for (int x = 0; x < segments.length; x++) { + segments[x] = segments[x].trim(); + } + conceptPathFromRow = String.join("\\", segments) + "\\"; + conceptPathFromRow = conceptPathFromRow.replaceAll("\\ufffd", ""); + String textValueFromRow = record.getTextValue(); + if (textValueFromRow != null) { + textValueFromRow = textValueFromRow.replaceAll("\\ufffd", ""); + } + String conceptPath = + conceptPathFromRow.endsWith("\\" + textValueFromRow + "\\") ? conceptPathFromRow.replaceAll("\\\\[^\\\\]*\\\\$", "\\\\") + : conceptPathFromRow; + // This is not getDouble because we need to handle null values, not coerce them into 0s + String numericValue = record.getNumericValue(); + if ((numericValue == null || numericValue.isEmpty()) && textValueFromRow != null) { + try { + numericValue = Double.parseDouble(textValueFromRow) + ""; + } catch (NumberFormatException e) { + + } + } + boolean isAlpha = (numericValue == null || numericValue.isEmpty()); + if (currentConcept[0] == null || !currentConcept[0].name.equals(conceptPath)) { + try { + currentConcept[0] = store.loadingCache.get(conceptPath); + } catch (InvalidCacheLoadException e) { + log.debug("New concept " + record.getConceptPath()); + currentConcept[0] = new PhenoCube(conceptPath, isAlpha ? String.class : Double.class); + store.loadingCache.put(conceptPath, currentConcept[0]); + } + } + String value = isAlpha ? record.getTextValue() : numericValue; + + if ( + value != null && !value.trim().isEmpty() + && ((isAlpha && currentConcept[0].vType == String.class) || (!isAlpha && currentConcept[0].vType == Double.class)) + ) { + value = value.trim(); + currentConcept[0] + .setColumnWidth(isAlpha ? Math.max(currentConcept[0].getColumnWidth(), value.getBytes().length) : Double.BYTES); + int patientId = record.getPatientNum(); + + currentConcept[0].add(patientId, isAlpha ? value : Double.parseDouble(value), record.getDateTime()); + store.allIds.add(patientId); + } + if (++processedRecords % LOG_INTERVAL == 0) { + log.info("Loaded " + processedRecords + " records"); + } + } catch (ExecutionException e) { + // todo: do we really want to ignore this? + log.error("Error processing record", e); + } + } +} diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/SequentialLoadingStore.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/sequential/SequentialLoadingStore.java similarity index 99% rename from etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/SequentialLoadingStore.java rename to etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/sequential/SequentialLoadingStore.java index c87b6434..370c6a78 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/SequentialLoadingStore.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/sequential/SequentialLoadingStore.java @@ -1,4 +1,4 @@ -package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype; +package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.sequential; import java.io.*; import java.util.*; diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/SQLLoader.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/sql/SQLLoader.java similarity index 96% rename from etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/SQLLoader.java rename to etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/sql/SQLLoader.java index 01860baa..59433470 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/SQLLoader.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/sql/SQLLoader.java @@ -1,24 +1,19 @@ -package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype; +package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.sql; import java.sql.ResultSet; import java.sql.SQLException; -import java.text.ParseException; import java.text.SimpleDateFormat; import java.io.*; import java.nio.charset.Charset; -import java.util.Date; import java.util.Properties; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import com.google.common.cache.CacheLoader.InvalidCacheLoadException; import edu.harvard.hms.dbmi.avillach.hpds.crypto.Crypto; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube; +import edu.harvard.hms.dbmi.avillach.hpds.etl.LoadingStore; import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/etl/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMCSVProcessorTest.java b/etl/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMCSVProcessorTest.java new file mode 100644 index 00000000..30121149 --- /dev/null +++ b/etl/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMCSVProcessorTest.java @@ -0,0 +1,182 @@ +package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.litecsv; + + +import edu.harvard.hms.dbmi.avillach.hpds.crypto.Crypto; +import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.ColumnMeta; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.*; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.TreeMap; +import java.util.zip.GZIPInputStream; + +class LowRAMCSVProcessorTest { + + private static final String TEST_KEY_PATH = "src/test/resources/test_named_encryption_key"; + + + @Test + void shouldProcessSimpleCSV(@TempDir File testDir) throws IOException, ClassNotFoundException { + String content = """ + PATIENT_NUM,CONCEPT_PATH,NUMERIC_VALUE,TEXT_VALUE,DATETIME + 1,\\foo\\1\\,,val,1 + 1,\\foo\\2\\,0,,1 + 2,\\foo\\1\\,,lav,1 + 2,\\foo\\2\\,99,,1 + """; + Path csvPath = Path.of(testDir.getAbsolutePath(), "test.csv"); + Files.writeString(csvPath, content); + LowRAMLoadingStore store = new LowRAMLoadingStore( + testDir.getAbsolutePath() + "/allObservationsTemp.javabin", testDir.getAbsolutePath() + "/columnMeta.javabin", + testDir.getAbsolutePath() + "/allObservationsStore.javabin", "TEST_KEY" + ); + + IngestStatus status = new LowRAMCSVProcessor(store, false, 1D).process(csvPath.toFile()); + + Assertions.assertEquals(2, status.conceptCount()); + Assertions.assertEquals(5, status.lineCount()); + Assertions.assertEquals(csvPath, status.file()); + + Crypto.loadKey("TEST_KEY", TEST_KEY_PATH); + store.saveStore(); + List expectedMetas = + List.of(new CategoricalMeta("\\foo\\1\\", 2, 2, List.of("lav", "val")), new NumericMeta("\\foo\\2\\", 2, 2, 0.0, 99.0)); + verifyStoreMeta(expectedMetas, testDir.getAbsolutePath() + "/columnMeta.javabin"); + } + + @Test + void shouldNotProcessCharAndInt(@TempDir File testDir) throws IOException, ClassNotFoundException { + String content = """ + PATIENT_NUM,CONCEPT_PATH,NUMERIC_VALUE,TEXT_VALUE,DATETIME + 1,\\foo\\1\\,,val,1 + 1,\\foo\\1\\,0,,1 + """; + Path csvPath = Path.of(testDir.getAbsolutePath(), "test.csv"); + Files.writeString(csvPath, content); + LowRAMLoadingStore store = new LowRAMLoadingStore( + testDir.getAbsolutePath() + "/allObservationsTemp.javabin", testDir.getAbsolutePath() + "/columnMeta.javabin", + testDir.getAbsolutePath() + "/allObservationsStore.javabin", "TEST_KEY" + ); + + IngestStatus status = new LowRAMCSVProcessor(store, false, 1D).process(csvPath.toFile()); + + Assertions.assertEquals(1, status.conceptCount()); + Assertions.assertEquals(3, status.lineCount()); + Assertions.assertEquals(csvPath, status.file()); + + Crypto.loadKey("TEST_KEY", TEST_KEY_PATH); + store.saveStore(); + List expectedMetas = List.of( + new CategoricalMeta("\\foo\\1\\", 1, 1, List.of("val")) // make sure "1" doesn't show up + ); + verifyStoreMeta(expectedMetas, testDir.getAbsolutePath() + "/columnMeta.javabin"); + } + + @Test + void shouldNotProcessIntAndChar(@TempDir File testDir) throws IOException, ClassNotFoundException { + String content = """ + PATIENT_NUM,CONCEPT_PATH,NUMERIC_VALUE,TEXT_VALUE,DATETIME + 1,\\foo\\1\\,0,,1 + 2,\\foo\\1\\,1,,1 + 1,\\foo\\1\\,,val,1 + """; + Path csvPath = Path.of(testDir.getAbsolutePath(), "test.csv"); + Files.writeString(csvPath, content); + LowRAMLoadingStore store = new LowRAMLoadingStore( + testDir.getAbsolutePath() + "/allObservationsTemp.javabin", testDir.getAbsolutePath() + "/columnMeta.javabin", + testDir.getAbsolutePath() + "/allObservationsStore.javabin", "TEST_KEY" + ); + + IngestStatus status = new LowRAMCSVProcessor(store, false, 1D).process(csvPath.toFile()); + + Assertions.assertEquals(1, status.conceptCount()); + Assertions.assertEquals(4, status.lineCount()); + Assertions.assertEquals(csvPath, status.file()); + + Crypto.loadKey("TEST_KEY", TEST_KEY_PATH); + store.saveStore(); + List expectedMetas = List.of( + new NumericMeta("\\foo\\1\\", 2, 2, 0.0, 1.0) // make sure "1" doesn't show up + ); + verifyStoreMeta(expectedMetas, testDir.getAbsolutePath() + "/columnMeta.javabin"); + } + + @Test + void shouldProcessLargeFile(@TempDir File testDir) throws IOException, ClassNotFoundException { + Path csvPath = Path.of(testDir.getAbsolutePath(), "test.csv"); + + // Create a ~200M file sorted by patient + String header = "PATIENT_NUM,CONCEPT_PATH,NUMERIC_VALUE,TEXT_VALUE,DATETIME\n"; + Files.writeString(csvPath, header); + // 5 char patient col + 22 char concept col + 1 char numeric value + 10 char date + 4 commas + newline = + // 43 chars of ascii-utf8 = 43 bytes per line, except the debugger says 44 so uh 44 it is + // I want a file that splits into 2 chunks. The chunk size is set to 0.1 G, so 2 * (0.1 * 1024^3)/44 + // So we need 4880644 lines of this to make a 0.2G file that splits into 2 chunks + // ... give or take. I shot a little under + try (FileWriter fw = new FileWriter(csvPath.toString(), true); BufferedWriter writer = new BufferedWriter(fw)) { + for (int line = 0; line < 4880000; line++) { + int patient = line / 100; + int concept = line % 100; + int val = line % 9; + String date = "1739329199"; + String row = String.format("%05d", patient) + ",\\my\\concept\\path\\" + String.format("%05d", concept) + "\\," + val + ",," + + date + '\n'; + writer.write(row); + if (line % 1000000 == 0) { + System.out.println("Wrote line: " + line); + } + } + } + + LowRAMLoadingStore store = new LowRAMLoadingStore( + testDir.getAbsolutePath() + "/allObservationsTemp.javabin", testDir.getAbsolutePath() + "/columnMeta.javabin", + testDir.getAbsolutePath() + "/allObservationsStore.javabin", "TEST_KEY" + ); + + IngestStatus status = new LowRAMCSVProcessor(store, false, 0.1).process(csvPath.toFile()); + Assertions.assertEquals(4880001, status.lineCount()); + Assertions.assertEquals(100, status.conceptCount()); + Assertions.assertEquals(csvPath, status.file()); + + Crypto.loadKey("TEST_KEY", TEST_KEY_PATH); + store.saveStore(); + List expectedMetas = List.of( + new NumericMeta("\\my\\concept\\path\\00000\\", 24399, 24399, 0.0, 8.0), + new NumericMeta("\\my\\concept\\path\\00099\\", 24400, 24400, 0.0, 8.0) + ); + verifyStoreMeta(expectedMetas, testDir.getAbsolutePath() + "/columnMeta.javabin"); + } + + private void verifyStoreMeta(List expectedMetas, String columnMetaPath) throws IOException, ClassNotFoundException { + ObjectInputStream objectInputStream = new ObjectInputStream(new GZIPInputStream(new FileInputStream(columnMetaPath))); + TreeMap metaStore = (TreeMap) objectInputStream.readObject(); + for (Meta expectedMeta : expectedMetas) { + ColumnMeta actualMeta = metaStore.get(expectedMeta.key()); + Assertions.assertNotNull(actualMeta); + Assertions.assertEquals(expectedMeta.patientCount(), actualMeta.getPatientCount()); + Assertions.assertEquals(expectedMeta.conceptCount(), actualMeta.getObservationCount()); + if (expectedMeta instanceof NumericMeta expectedNumeric) { + Assertions.assertEquals(expectedNumeric.min(), actualMeta.getMin()); + Assertions.assertEquals(expectedNumeric.max(), actualMeta.getMax()); + } else if (expectedMeta instanceof CategoricalMeta expectedCategorical) { + Assertions.assertEquals(expectedCategorical.values(), actualMeta.getCategoryValues()); + } + } + } + + private sealed interface Meta permits NumericMeta, CategoricalMeta { + String key(); + + int patientCount(); + + int conceptCount(); + } + private record NumericMeta(String key, int patientCount, int conceptCount, Double min, Double max) implements Meta { + } + private record CategoricalMeta(String key, int patientCount, int conceptCount, List values) implements Meta { + } +} diff --git a/etl/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMMultiCSVLoaderTest.java b/etl/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMMultiCSVLoaderTest.java new file mode 100644 index 00000000..200c214e --- /dev/null +++ b/etl/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMMultiCSVLoaderTest.java @@ -0,0 +1,48 @@ +package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.litecsv; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.file.Path; + +class LowRAMMultiCSVLoaderTest { + + @Test + void shouldFilterOutNonCSVs(@TempDir File testDir) throws IOException { + Path csvPath = Path.of(testDir.getAbsolutePath(), "test.txt"); + RandomAccessFile largeFile = new RandomAccessFile(csvPath.toString(), "rw"); + largeFile.setLength(6L*1024); + + LowRAMCSVProcessor processor = Mockito.mock(LowRAMCSVProcessor.class); + LowRAMLoadingStore store = Mockito.mock(LowRAMLoadingStore.class); + + LowRAMMultiCSVLoader subject = new LowRAMMultiCSVLoader(store, processor, testDir.getAbsolutePath()); + int actual = subject.processCSVsFromHPDSDir(); + + Assertions.assertEquals(0, actual); + Mockito.verify(processor, Mockito.times(0)).process(Mockito.any()); + } + + @Test + void shouldProcessSmallCSVs(@TempDir File testDir) throws IOException { + Path csvPath = Path.of(testDir.getAbsolutePath(), "test.csv"); + RandomAccessFile largeFile = new RandomAccessFile(csvPath.toString(), "rw"); + largeFile.setLength(6L*1024); + + LowRAMCSVProcessor processor = Mockito.mock(LowRAMCSVProcessor.class); + Mockito.when(processor.process(Mockito.any())) + .thenReturn(new IngestStatus(csvPath, 10, 10, 10L)); + LowRAMLoadingStore store = Mockito.mock(LowRAMLoadingStore.class); + + LowRAMMultiCSVLoader subject = new LowRAMMultiCSVLoader(store, processor, testDir.getAbsolutePath()); + int actual = subject.processCSVsFromHPDSDir(); + + Assertions.assertEquals(0, actual); + Mockito.verify(processor, Mockito.times(1)).process(Mockito.any()); + } +} \ No newline at end of file diff --git a/etl/src/test/resources/test_named_encryption_key b/etl/src/test/resources/test_named_encryption_key new file mode 100644 index 00000000..d3720b6d --- /dev/null +++ b/etl/src/test/resources/test_named_encryption_key @@ -0,0 +1 @@ +1111111111111111 \ No newline at end of file diff --git a/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/test/util/BuildIntegrationTestEnvironment.java b/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/test/util/BuildIntegrationTestEnvironment.java index 4b91cece..71a75424 100644 --- a/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/test/util/BuildIntegrationTestEnvironment.java +++ b/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/test/util/BuildIntegrationTestEnvironment.java @@ -4,7 +4,7 @@ import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantStore; import edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.NewVCFLoader; import edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.VariantMetadataLoader; -import edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.CSVLoader; +import edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.csv.CSVLoader; import java.io.IOException; From d382fe105befc481a9cc0eec6a21cfa1e30c7ad4 Mon Sep 17 00:00:00 2001 From: Luke Sikina Date: Fri, 14 Feb 2025 06:00:24 -0500 Subject: [PATCH 2/2] [CHORE] Add spotless to pom --- pom.xml | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pom.xml b/pom.xml index 917fc150..f528d2f4 100644 --- a/pom.xml +++ b/pom.xml @@ -31,6 +31,7 @@ UTF-8 1.4.10 2.20.153 + 2.41.1 @@ -105,6 +106,20 @@ 2.0 + + com.diffplug.spotless + spotless-maven-plugin + ${spotless.version} + + + + 4.26 + code-formatting/eclipse-formatter.xml + + + + +