From f570131d79cf2fef343ce69e1c88a2aecba7ab29 Mon Sep 17 00:00:00 2001 From: Luke Sikina Date: Tue, 4 Feb 2025 14:28:58 -0500 Subject: [PATCH] [ALS-7113] Enhance sequential loader --- docker/pic-sure-hpds-etl/Dockerfile | 1 + etl/pom.xml | 28 ++- .../data/genotype/util/RemapPatientIds.java | 17 -- .../data/phenotype/util/DumpSourceCSV.java | 2 +- .../util/FixCategoricalConcepts.java | 2 +- .../data/phenotype/util/RekeyDataset.java | 2 +- .../hpds/data/phenotype/util/RemapIds.java | 2 +- .../data/phenotype/util/RenameCategories.java | 2 +- .../etl/{phenotype => }/LoadingStore.java | 2 +- .../etl/phenotype/{ => csv}/CSVLoader.java | 3 +- .../{ => csv}/CSVLoaderNewSearch.java | 51 +--- .../hpds/etl/phenotype/csv/CSVParserUtil.java | 55 ++++ .../etl/phenotype/litecsv/IngestStatus.java | 6 + .../phenotype/litecsv/LowRAMCSVProcessor.java | 169 +++++++++++++ .../phenotype/litecsv/LowRAMLoadingStore.java | 237 ++++++++++++++++++ .../litecsv/LowRAMMultiCSVLoader.java | 70 ++++++ .../{ => sequential}/PhenoRecord.java | 2 +- .../{ => sequential}/SequentialLoader.java | 85 ++++++- .../SequentialLoadingStore.java | 2 +- .../etl/phenotype/{ => sql}/SQLLoader.java | 9 +- .../litecsv/LowRAMCSVProcessorTest.java | 186 ++++++++++++++ .../litecsv/LowRAMMultiCSVLoaderTest.java | 48 ++++ .../test/resources/test_named_encryption_key | 1 + .../util/BuildIntegrationTestEnvironment.java | 2 +- 24 files changed, 901 insertions(+), 83 deletions(-) rename etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/{phenotype => }/LoadingStore.java (99%) rename etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/{ => csv}/CSVLoader.java (97%) rename etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/{ => csv}/CSVLoaderNewSearch.java (62%) create mode 100644 etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/csv/CSVParserUtil.java create mode 100644 etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/IngestStatus.java create mode 100644 etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMCSVProcessor.java create mode 100644 etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMLoadingStore.java create mode 100644 etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMMultiCSVLoader.java rename etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/{ => sequential}/PhenoRecord.java (97%) rename etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/{ => sequential}/SequentialLoader.java (68%) rename etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/{ => sequential}/SequentialLoadingStore.java (99%) rename etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/{ => sql}/SQLLoader.java (96%) create mode 100644 etl/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMCSVProcessorTest.java create mode 100644 etl/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMMultiCSVLoaderTest.java create mode 100644 etl/src/test/resources/test_named_encryption_key diff --git a/docker/pic-sure-hpds-etl/Dockerfile b/docker/pic-sure-hpds-etl/Dockerfile index f160b120..661b98a1 100644 --- a/docker/pic-sure-hpds-etl/Dockerfile +++ b/docker/pic-sure-hpds-etl/Dockerfile @@ -28,6 +28,7 @@ COPY --from=build /app/docker/pic-sure-hpds-etl/RekeyDataset-jar-with-dependenci COPY --from=build /app/docker/pic-sure-hpds-etl/RemoveConceptFromMetadata-jar-with-dependencies.jar . COPY --from=build /app/docker/pic-sure-hpds-etl/HideAnnotationCategoryValue-jar-with-dependencies.jar . COPY --from=build /app/docker/pic-sure-hpds-etl/SequentialLoader-jar-with-dependencies.jar . +COPY --from=build /app/docker/pic-sure-hpds-etl/LowRAMMultiCSVLoader-jar-with-dependencies.jar . COPY --from=build /app/docker/pic-sure-hpds-etl/create_key.sh . ENTRYPOINT ["sh", "-c", "java $JAVA_OPTS -Xmx${HEAPSIZE:-2048}m -jar ${LOADER_NAME:-CSVLoader}-jar-with-dependencies.jar"] diff --git a/etl/pom.xml b/etl/pom.xml index c218d06a..64f087ca 100644 --- a/etl/pom.xml +++ b/etl/pom.xml @@ -135,7 +135,7 @@ - edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.CSVLoaderNewSearch + edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.csv.CSVLoaderNewSearch ${project.basedir}/../docker/pic-sure-hpds-etl @@ -155,7 +155,7 @@ - edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.CSVLoader + edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.csv.CSVLoader ${project.basedir}/../docker/pic-sure-hpds-etl @@ -175,7 +175,7 @@ - edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.SQLLoader + edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.sql.SQLLoader ${project.basedir}/../docker/pic-sure-hpds-etl @@ -195,7 +195,7 @@ - edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.SequentialLoader + edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.sequential.SequentialLoader ${project.basedir}/../docker/pic-sure-hpds-etl @@ -210,6 +210,26 @@ single + + buildLiteCSVLoader + + + + edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.litecsv.LowRAMMultiCSVLoader + + + ${project.basedir}/../docker/pic-sure-hpds-etl + + jar-with-dependencies + + LowRAMMultiCSVLoader + LowRAMMultiCSVLoader + + package + + single + + buildMultialleleCounter diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/util/RemapPatientIds.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/util/RemapPatientIds.java index 0bd73458..22605f5d 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/util/RemapPatientIds.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/util/RemapPatientIds.java @@ -1,39 +1,22 @@ package edu.harvard.hms.dbmi.avillach.hpds.data.genotype.util; -import java.io.ByteArrayInputStream; -import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.FileReader; -import java.io.FileWriter; import java.io.IOException; -import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -import java.io.RandomAccessFile; -import java.util.ArrayList; -import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; -import java.util.Map.Entry; -import java.util.concurrent.ExecutionException; -import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -import com.google.common.collect.ImmutableList; import de.siegmar.fastcsv.reader.CsvContainer; import de.siegmar.fastcsv.reader.CsvReader; import de.siegmar.fastcsv.reader.CsvRow; -import de.siegmar.fastcsv.writer.CsvWriter; -import edu.harvard.hms.dbmi.avillach.hpds.crypto.Crypto; import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantStore; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.ColumnMeta; -import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.KeyAndValue; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube; -import edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.LoadingStore; public class RemapPatientIds { protected static LoadingCache> store; diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/DumpSourceCSV.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/DumpSourceCSV.java index 585204d8..03d892d2 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/DumpSourceCSV.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/DumpSourceCSV.java @@ -21,7 +21,7 @@ import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.ColumnMeta; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.KeyAndValue; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube; -import edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.LoadingStore; +import edu.harvard.hms.dbmi.avillach.hpds.etl.LoadingStore; public class DumpSourceCSV { protected static LoadingCache> store; diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/FixCategoricalConcepts.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/FixCategoricalConcepts.java index 579d8a16..a651f397 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/FixCategoricalConcepts.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/FixCategoricalConcepts.java @@ -18,7 +18,7 @@ import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.ColumnMeta; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.KeyAndValue; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube; -import edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.LoadingStore; +import edu.harvard.hms.dbmi.avillach.hpds.etl.LoadingStore; @SuppressWarnings({"unchecked", "rawtypes"}) public class FixCategoricalConcepts { diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/RekeyDataset.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/RekeyDataset.java index 6d28bca8..75b35ce3 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/RekeyDataset.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/RekeyDataset.java @@ -18,7 +18,7 @@ import edu.harvard.hms.dbmi.avillach.hpds.crypto.Crypto; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.ColumnMeta; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube; -import edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.LoadingStore; +import edu.harvard.hms.dbmi.avillach.hpds.etl.LoadingStore; @SuppressWarnings({"unchecked"}) public class RekeyDataset { diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/RemapIds.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/RemapIds.java index e8cef09d..6733bc6d 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/RemapIds.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/RemapIds.java @@ -18,7 +18,7 @@ import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.ColumnMeta; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.KeyAndValue; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube; -import edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.LoadingStore; +import edu.harvard.hms.dbmi.avillach.hpds.etl.LoadingStore; @SuppressWarnings({"unchecked", "rawtypes"}) public class RemapIds { diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/RenameCategories.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/RenameCategories.java index 1711370b..268844e8 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/RenameCategories.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/phenotype/util/RenameCategories.java @@ -20,7 +20,7 @@ import edu.harvard.hms.dbmi.avillach.hpds.crypto.Crypto; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.ColumnMeta; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube; -import edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.LoadingStore; +import edu.harvard.hms.dbmi.avillach.hpds.etl.LoadingStore; public class RenameCategories { diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/LoadingStore.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/LoadingStore.java similarity index 99% rename from etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/LoadingStore.java rename to etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/LoadingStore.java index f857936e..62b5b8e9 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/LoadingStore.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/LoadingStore.java @@ -1,4 +1,4 @@ -package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype; +package edu.harvard.hms.dbmi.avillach.hpds.etl; import java.io.*; import java.nio.file.Files; diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/CSVLoader.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/csv/CSVLoader.java similarity index 97% rename from etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/CSVLoader.java rename to etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/csv/CSVLoader.java index 5a1e236f..cb7301f6 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/CSVLoader.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/csv/CSVLoader.java @@ -1,9 +1,10 @@ -package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype; +package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.csv; import java.io.*; import java.util.Date; import java.util.concurrent.ExecutionException; +import edu.harvard.hms.dbmi.avillach.hpds.etl.LoadingStore; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVRecord; import org.slf4j.Logger; diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/CSVLoaderNewSearch.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/csv/CSVLoaderNewSearch.java similarity index 62% rename from etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/CSVLoaderNewSearch.java rename to etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/csv/CSVLoaderNewSearch.java index 1c5e2d8a..4b86232d 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/CSVLoaderNewSearch.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/csv/CSVLoaderNewSearch.java @@ -1,7 +1,8 @@ -package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype; +package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.csv; import edu.harvard.hms.dbmi.avillach.hpds.crypto.Crypto; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube; +import edu.harvard.hms.dbmi.avillach.hpds.etl.LoadingStore; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVRecord; import org.slf4j.Logger; @@ -17,16 +18,6 @@ public class CSVLoaderNewSearch { private static final Logger log = LoggerFactory.getLogger(CSVLoaderNewSearch.class); - private static final int PATIENT_NUM = 0; - - private static final int CONCEPT_PATH = 1; - - private static final int NUMERIC_VALUE = 2; - - private static final int TEXT_VALUE = 3; - - private static final int DATETIME = 4; - private static boolean DO_VARNAME_ROLLUP = false; private static final String HPDS_DIRECTORY = "/opt/local/hpds/"; @@ -56,24 +47,24 @@ private static void initialLoad() throws IOException { private static void processRecord(final PhenoCube[] currentConcept, CSVRecord record) { if (record.size() < 4) { - log.info("Record number {} had less records than we expected so we are skipping it.", record.getRecordNumber()); + log.info("Record number {} had less records than we exgpected so we are skipping it.", record.getRecordNumber()); return; } - String conceptPath = getSanitizedConceptPath(record); - String numericValue = record.get(NUMERIC_VALUE); + String conceptPath = CSVParserUtil.parseConceptPath(record, DO_VARNAME_ROLLUP); + String numericValue = record.get(CSVParserUtil.NUMERIC_VALUE); boolean isAlpha = (numericValue == null || numericValue.isEmpty()); - String value = isAlpha ? record.get(TEXT_VALUE) : numericValue; + String value = isAlpha ? record.get(CSVParserUtil.TEXT_VALUE) : numericValue; currentConcept[0] = getPhenoCube(currentConcept[0], conceptPath, isAlpha); if (value != null && !value.trim().isEmpty() && ((isAlpha && currentConcept[0].vType == String.class) || (!isAlpha && currentConcept[0].vType == Double.class))) { value = value.trim(); currentConcept[0].setColumnWidth(isAlpha ? Math.max(currentConcept[0].getColumnWidth(), value.getBytes().length) : Double.BYTES); - int patientId = Integer.parseInt(record.get(PATIENT_NUM)); + int patientId = Integer.parseInt(record.get(CSVParserUtil.PATIENT_NUM)); Date date = null; - if (record.size() > 4 && record.get(DATETIME) != null && !record.get(DATETIME).isEmpty()) { - date = new Date(Long.parseLong(record.get(DATETIME))); + if (record.size() > 4 && record.get(CSVParserUtil.DATETIME) != null && !record.get(CSVParserUtil.DATETIME).isEmpty()) { + date = new Date(Long.parseLong(record.get(CSVParserUtil.DATETIME))); } currentConcept[0].add(patientId, isAlpha ? value : Double.parseDouble(value), date); store.allIds.add(patientId); @@ -91,28 +82,4 @@ private static PhenoCube getPhenoCube(PhenoCube currentConcept, String conceptPa return currentConcept; } - - private static String getSanitizedConceptPath(CSVRecord record) { - String conceptPathFromRow = record.get(CONCEPT_PATH); - String[] segments = conceptPathFromRow.split("\\\\"); - for (int x = 0; x < segments.length; x++) { - segments[x] = segments[x].trim(); - } - conceptPathFromRow = String.join("\\", segments) + "\\"; - conceptPathFromRow = conceptPathFromRow.replaceAll("\\ufffd", ""); - String textValueFromRow = record.get(TEXT_VALUE) == null ? null : record.get(TEXT_VALUE).trim(); - if (textValueFromRow != null) { - textValueFromRow = textValueFromRow.replaceAll("\\ufffd", ""); - } - String conceptPath; - - if (DO_VARNAME_ROLLUP) { - conceptPath = conceptPathFromRow.endsWith("\\" + textValueFromRow + "\\") ? conceptPathFromRow.replaceAll("\\\\[^\\\\]*\\\\$", "\\\\") : conceptPathFromRow; - } else { - conceptPath = conceptPathFromRow; - } - return conceptPath; - } - - } diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/csv/CSVParserUtil.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/csv/CSVParserUtil.java new file mode 100644 index 00000000..9f5181a7 --- /dev/null +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/csv/CSVParserUtil.java @@ -0,0 +1,55 @@ +package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.csv; + +import org.apache.commons.csv.CSVRecord; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.Date; +import java.util.stream.Collectors; + +public class CSVParserUtil { + public static final int PATIENT_NUM = 0; + public static final int CONCEPT_PATH = 1; + public static final int NUMERIC_VALUE = 2; + public static final int TEXT_VALUE = 3; + public static final int DATETIME = 4; + + public static String parseConceptPath(CSVRecord record, boolean doVarNameRollup) { + String conceptPathFromRow = record.get(CONCEPT_PATH); + conceptPathFromRow = Arrays.stream(conceptPathFromRow.split("\\\\")) + .map(String::trim) + .collect(Collectors.joining("\\")) + "\\"; + conceptPathFromRow = stripWeirdUnicodeChars(conceptPathFromRow); + + // \\ufffd = � + String textValueFromRow = stripWeirdUnicodeChars(trim(record.get(TEXT_VALUE))); + if (doVarNameRollup && conceptPathFromRow.endsWith("\\" + textValueFromRow + "\\")) { + // This regex deletes the last node from the concept path, i.e. "rolling it up" + return conceptPathFromRow.replaceAll("\\\\[^\\\\]*\\\\$", "\\\\"); + } else { + return conceptPathFromRow; + } + } + + private static String stripWeirdUnicodeChars(@Nonnull String raw) { + return raw.replaceAll("\\ufffd", ""); + } + + public static String trim(@Nullable String maybeString) { + return maybeString == null ? "" : maybeString.trim(); + } + + public static @Nullable Date parseDate(CSVRecord record) { + Date date = null; + try { + if (record.size() > 4 && record.get(DATETIME) != null && !record.get(DATETIME).isEmpty()) { + date = new Date(Long.parseLong(record.get(DATETIME))); + } + } catch (NumberFormatException e) { + return null; + } + + return date; + } +} diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/IngestStatus.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/IngestStatus.java new file mode 100644 index 00000000..33748767 --- /dev/null +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/IngestStatus.java @@ -0,0 +1,6 @@ +package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.litecsv; + +import java.nio.file.Path; + +public record IngestStatus(Path file, long lineCount, int conceptCount, long duration) { +} diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMCSVProcessor.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMCSVProcessor.java new file mode 100644 index 00000000..bd233edd --- /dev/null +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMCSVProcessor.java @@ -0,0 +1,169 @@ +package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.litecsv; + +import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube; +import edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.csv.CSVParserUtil; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.apache.logging.log4j.util.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.*; +import java.nio.file.Files; +import java.util.*; +import java.util.stream.Stream; + +public class LowRAMCSVProcessor { + + private static final Logger log = LoggerFactory.getLogger(LowRAMCSVProcessor.class); + private final LowRAMLoadingStore store; + private final boolean doVarNameRollup; + private final double maxChunkSizeGigs; + + public LowRAMCSVProcessor(LowRAMLoadingStore store, boolean doVarNameRollup, double maxChunkSizeGigs) { + this.store = store; + this.doVarNameRollup = doVarNameRollup; + this.maxChunkSizeGigs = maxChunkSizeGigs; + } + + public IngestStatus process(File csv) { + long startTime = System.nanoTime(); + log.info("Attempting to ingest file {}", csv.getAbsolutePath()); + try (Reader r = new FileReader(csv); Stream rawLines = Files.lines(csv.toPath())) { + CSVParser parser = CSVFormat.DEFAULT + .withSkipHeaderRecord() + .withFirstRecordAsHeader() + .parse(new BufferedReader(r)); + + // we want to read the file in reasonably sized chunks so that we can handle chunks naively + // in memory without going OOM + // to do this, we're going to assume that over the course of thousands of lines, each line + // is more or less the same length + // so we'll just provision n chunks, where n is max(1, file_size/max_chunk_size) + log.info("Gathering stats about file {}", csv.getName()); + int chunks = Math.max(1, (int)Math.ceil((double) csv.length() / (maxChunkSizeGigs*1024*1024*1024))); + final long totalLineCount = rawLines.count(); + final long linesPerChunk = totalLineCount / chunks; + log.info( + "File {} is {} bytes and {} lines. Dividing into {} chunks of {} lines each", + csv.getName(), csv.length(), totalLineCount, chunks, linesPerChunk + ); + long chunkLineCount = 0; + long lineCount = 0; + int chunkCount = 0; + Set concepts = new HashSet<>(); + List lines = new ArrayList<>(); + + log.info("Creating chunks"); + for (CSVRecord record : parser) { + chunkLineCount++; + lineCount++; + lines.add(record); + if (chunkLineCount > linesPerChunk || lineCount + 1 == totalLineCount ) { + log.info("Finished creating chunk {}", chunkCount); + // sort by concept to prevent cache thrashing when ingesting + // loading each concept in its entirety for a chunk will minimize disk IO and + // let us keep more valuable things in RAM + lines.sort(Comparator.comparing(a -> a.get(1))); + log.info("Finished sorting chunk {}", chunkCount); + Set chunkConcepts = ingest(lines); + concepts.addAll(chunkConcepts); + log.info("Finished ingesting chunk {} with {} unique concepts", chunkCount, chunkConcepts.size()); + lines = new ArrayList<>(); + chunkLineCount = 0; + chunkCount++; + } + } + + return new IngestStatus( + csv.toPath(), + totalLineCount, + concepts.size(), + System.nanoTime() - startTime + ); + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + + private Set ingest(List sortedRecords) { + Set concepts = new HashSet<>(); + for (CSVRecord record : sortedRecords) { + if (record.size() < 4) { + log.info("Record #{} has too few columns, skipping.", record.getRecordNumber()); + continue; + } + + String conceptPath = CSVParserUtil.parseConceptPath(record, doVarNameRollup); + IngestFn ingestFn = Strings.isEmpty(record.get(CSVParserUtil.NUMERIC_VALUE)) ? this::ingestNonNumeric : this::ingestNumeric; + Date date = CSVParserUtil.parseDate(record); + int patientId = Integer.parseInt(record.get(CSVParserUtil.PATIENT_NUM)); + if (ingestFn.attemptIngest(record, conceptPath, patientId, date)) { + concepts.add(conceptPath); + } else { + log.warn("Could not ingest record #{}", record.getRecordNumber()); + } + } + return concepts; + } + + @FunctionalInterface + private interface IngestFn { + boolean attemptIngest(CSVRecord record, String path, int patientId, @Nullable Date date); + } + + private boolean ingestNumeric(CSVRecord record, String conceptPath, int patientId, Date date) { + PhenoCube concept = store.loadingCache.getIfPresent(conceptPath); + if (concept == null) { + concept = new PhenoCube<>(conceptPath, Double.class); + concept.setColumnWidth(Double.BYTES); + store.loadingCache.put(conceptPath, concept); + } + if (!concept.vType.equals(Double.class)) { + log.error(""" + Concept bucket {} was configured for non-numeric types, but received numeric value {} + This happens when, for a single concept, there rows that have a tval_char, and other + rows with an nval_num but no tval_char. + Offending record #{} + """, conceptPath, record.get(CSVParserUtil.NUMERIC_VALUE), record.getRecordNumber()); + return false; + } + try { + String rawNumericValue = CSVParserUtil.trim(record.get(CSVParserUtil.NUMERIC_VALUE)); + double parsedValue = Double.parseDouble(rawNumericValue); + concept.add(patientId, parsedValue, date); + return true; + } catch (NumberFormatException e) { + log.warn("Could not parse numeric value in line {}", record); + } + return false; + } + + private boolean ingestNonNumeric(CSVRecord record, String conceptPath, int patientId, Date date) { + PhenoCube concept = store.loadingCache.getIfPresent(conceptPath); + if (concept == null) { + concept = new PhenoCube<>(conceptPath, String.class); + store.loadingCache.put(conceptPath, concept); + } + if (!concept.vType.equals(String.class)) { + log.error(""" + Concept bucket {} was configured for numeric types, but received non-numeric value {} + This happens when, for a single concept, there rows that have a tval_char, and other + rows with an nval_num but no tval_char. + Offending record #{} + """, conceptPath, record.get(CSVParserUtil.TEXT_VALUE), record.getRecordNumber()); + return false; + } + String rawTextValue = CSVParserUtil.trim(record.get(CSVParserUtil.TEXT_VALUE)); + if (rawTextValue.isEmpty()) { + return false; + } + concept.setColumnWidth(Math.max(rawTextValue.getBytes().length, concept.getColumnWidth())); + concept.add(patientId, rawTextValue, date); + return true; + } +} diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMLoadingStore.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMLoadingStore.java new file mode 100644 index 00000000..42822783 --- /dev/null +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMLoadingStore.java @@ -0,0 +1,237 @@ +package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.litecsv; + +import com.google.common.cache.*; +import edu.harvard.hms.dbmi.avillach.hpds.crypto.Crypto; +import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.ColumnMeta; +import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.KeyAndValue; +import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.*; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import static edu.harvard.hms.dbmi.avillach.hpds.crypto.Crypto.DEFAULT_KEY_NAME; + +/** + * This class provides similar functioanlity to the LoadingStore class, but is designed with sequential loading in mind. + * + * This will write out partial cubes to individual files instead of assuming that they can immediately be + * sequenced into the allObservationStore; This will allow us to pick them back up and add more patients + * to existing stores so that we can better handle fragmented data. + * + * + * @author nchu + * + */ +public class LowRAMLoadingStore { + + private final String COLUMNMETA_FILENAME; + protected final String OBSERVATIONS_FILENAME; + protected final String OBS_TEMP_FILENAME; + private final String encryptionKeyName; + + private final RandomAccessFile allObservationsTemp; + + TreeMap metadataMap = new TreeMap<>(); + + private static Logger log = LoggerFactory.getLogger(LowRAMLoadingStore.class); + + public LowRAMLoadingStore() { + this( + "/opt/local/hpds/allObservationsTemp.javabin", + "/opt/local/hpds/columnMeta.javabin", + "/opt/local/hpds/allObservationsStore.javabin", + DEFAULT_KEY_NAME + ); + } + + public LowRAMLoadingStore(String observationsTempFile, String columnMetaTempFile, String observationsPermFile, String encryptionKeyName) { + OBS_TEMP_FILENAME = observationsTempFile; + COLUMNMETA_FILENAME = columnMetaTempFile; + OBSERVATIONS_FILENAME = observationsPermFile; + this.encryptionKeyName = encryptionKeyName; + try { + allObservationsTemp = new RandomAccessFile(OBS_TEMP_FILENAME, "rw"); + } catch (FileNotFoundException e) { + throw new UncheckedIOException(e); + } + } + + public LoadingCache loadingCache = CacheBuilder.newBuilder() + .maximumSize(1) + .removalListener(new RemovalListener() { + + @Override + public void onRemoval(RemovalNotification cubeRemoval) { + if(cubeRemoval.getValue().getLoadingMap()!=null) { + try( + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(byteStream) + ) { + ColumnMeta columnMeta = new ColumnMeta().setName(cubeRemoval.getKey()).setWidthInBytes(cubeRemoval.getValue().getColumnWidth()).setCategorical(cubeRemoval.getValue().isStringType()); + columnMeta.setAllObservationsOffset(allObservationsTemp.getFilePointer()); + //write out the basic key/value map for loading; this will be compacted and finalized after all concepts are read in. + out.writeObject(cubeRemoval.getValue().getLoadingMap()); out.flush(); + + allObservationsTemp.write(byteStream.toByteArray()); + columnMeta.setAllObservationsLength(allObservationsTemp.getFilePointer()); + metadataMap.put(columnMeta.getName(), columnMeta); + } catch (IOException e1) { + throw new UncheckedIOException(e1); + } + } + } + }) + .build( + new CacheLoader<>() { + public PhenoCube load(String key) throws Exception { + ColumnMeta columnMeta = metadataMap.get(key); + if (columnMeta != null) { + log.debug("Loading concept : [" + key + "]"); + return getCubeFromTemp(columnMeta); + } else { + return null; + } + } + }); + + public TreeSet allIds = new TreeSet(); + + public void saveStore() throws FileNotFoundException, IOException, ClassNotFoundException { + log.info("flushing temp storage"); + loadingCache.invalidateAll(); + loadingCache.cleanUp(); + + RandomAccessFile allObservationsStore = new RandomAccessFile(OBSERVATIONS_FILENAME, "rw"); + //we dumped it all in a temp file; now sort all the data and compress it into the real Store + for(String concept : metadataMap.keySet()) { + ColumnMeta columnMeta = metadataMap.get(concept); + log.debug("Writing concept : [{}]", concept); + PhenoCube cube = getCubeFromTemp(columnMeta); + complete(columnMeta, cube); + write(allObservationsStore, columnMeta, cube); + } + allObservationsStore.close(); + + log.info("Writing metadata"); + ObjectOutputStream metaOut = new ObjectOutputStream(new GZIPOutputStream(new FileOutputStream(new File(COLUMNMETA_FILENAME)))); + metaOut.writeObject(metadataMap); + metaOut.writeObject(allIds); + metaOut.flush(); + metaOut.close(); + + log.info("Cleaning up temporary file"); + + allObservationsTemp.close(); + File tempFile = new File(OBS_TEMP_FILENAME); + tempFile.delete(); + } + + private void write(RandomAccessFile allObservationsStore, ColumnMeta columnMeta, PhenoCube cube) throws IOException { + columnMeta.setAllObservationsOffset(allObservationsStore.getFilePointer()); + + try(ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(byteStream);) { + + out.writeObject(cube); out.flush(); + allObservationsStore.write(Crypto.encryptData(encryptionKeyName, byteStream.toByteArray())); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + columnMeta.setAllObservationsLength(allObservationsStore.getFilePointer()); + } + + private PhenoCube getCubeFromTemp(ColumnMeta columnMeta) throws IOException, ClassNotFoundException { + allObservationsTemp.seek(columnMeta.getAllObservationsOffset()); + int length = (int) (columnMeta.getAllObservationsLength() - columnMeta.getAllObservationsOffset()); + byte[] buffer = new byte[length]; + allObservationsTemp.read(buffer); + allObservationsTemp.seek(allObservationsTemp.length()); + ObjectInputStream inStream = new ObjectInputStream(new ByteArrayInputStream(buffer)); + + PhenoCube cube = new PhenoCube(columnMeta.getName() , columnMeta.isCategorical() ? String.class : Double.class); + cube.setLoadingMap((List)inStream.readObject()); + cube.setColumnWidth(columnMeta.getWidthInBytes()); + inStream.close(); + return cube; + } + + private > void complete(ColumnMeta columnMeta, PhenoCube cube) { + ArrayList> entryList = new ArrayList>( + cube.getLoadingMap().stream().map((entry)->{ + return new KeyAndValue(entry.getKey(), entry.getValue(), entry.getTimestamp()); + }).collect(Collectors.toList())); + + List> sortedByKey = entryList.stream() + .sorted(Comparator.comparing(KeyAndValue::getKey)) + .collect(Collectors.toList()); + cube.setSortedByKey(sortedByKey.toArray(new KeyAndValue[0])); + + if(cube.isStringType()) { + TreeMap> categoryMap = new TreeMap<>(); + for(KeyAndValue entry : cube.sortedByValue()) { + if(!categoryMap.containsKey(entry.getValue())) { + categoryMap.put(entry.getValue(), new LinkedList()); + } + categoryMap.get(entry.getValue()).add(entry.getKey()); + } + TreeMap> categorySetMap = new TreeMap<>(); + categoryMap.entrySet().stream().forEach((entry)->{ + categorySetMap.put(entry.getKey(), new TreeSet(entry.getValue())); + }); + cube.setCategoryMap(categorySetMap); + } + + columnMeta.setObservationCount(cube.sortedByKey().length); + columnMeta.setPatientCount(Arrays.stream(cube.sortedByKey()).map((kv)->{return kv.getKey();}).collect(Collectors.toSet()).size()); + if(columnMeta.isCategorical()) { + columnMeta.setCategoryValues(new ArrayList(new TreeSet((List)cube.keyBasedArray()))); + } else { + List map = cube.keyBasedArray().stream().map((value)->{return (Double) value;}).collect(Collectors.toList()); + double min = Double.MAX_VALUE; + double max = Double.MIN_VALUE; + for(double f : map) { + min = Double.min(min, f); + max = Double.max(max, f); + } + columnMeta.setMin(min); + columnMeta.setMax(max); + } + + } + + public void dumpStats() { + log.info("Dumping Stats"); + try (ObjectInputStream objectInputStream = new ObjectInputStream(new GZIPInputStream(new FileInputStream(COLUMNMETA_FILENAME)));){ + TreeMap metastore = (TreeMap) objectInputStream.readObject(); + Set allIds = (TreeSet) objectInputStream.readObject(); + + long totalNumberOfObservations = 0; + + log.info("\n\nConceptPath\tObservationCount\tMinNumValue\tMaxNumValue\tCategoryValues"); + for(String key : metastore.keySet()) { + ColumnMeta columnMeta = metastore.get(key); + log.info(String.join("\t", key.toString(), columnMeta.getObservationCount()+"", + columnMeta.getMin()==null ? "NaN" : columnMeta.getMin().toString(), + columnMeta.getMax()==null ? "NaN" : columnMeta.getMax().toString(), + columnMeta.getCategoryValues() == null ? "NUMERIC CONCEPT" : String.join(",", + columnMeta.getCategoryValues() + .stream().map((value)->{return value==null ? "NULL_VALUE" : "\""+value+"\"";}).collect(Collectors.toList())))); + totalNumberOfObservations += columnMeta.getObservationCount(); + } + + log.info("Total Number of Concepts : " + metastore.size()); + log.info("Total Number of Patients : " + allIds.size()); + log.info("Total Number of Observations : " + totalNumberOfObservations); + + } catch (IOException | ClassNotFoundException e) { + throw new RuntimeException("Could not load metastore"); + } + } + + +} diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMMultiCSVLoader.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMMultiCSVLoader.java new file mode 100644 index 00000000..9bcf414b --- /dev/null +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMMultiCSVLoader.java @@ -0,0 +1,70 @@ +package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.litecsv; + +import edu.harvard.hms.dbmi.avillach.hpds.crypto.Crypto; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.stream.Stream; + +public class LowRAMMultiCSVLoader { + + private static final Logger log = LoggerFactory.getLogger(LowRAMMultiCSVLoader.class); + private final LowRAMCSVProcessor processor; + private final String inputDir; + + public LowRAMMultiCSVLoader(LowRAMLoadingStore store, LowRAMCSVProcessor processor, String inputDir) { + this.inputDir = inputDir == null ? "/opt/local/hpds_input" : inputDir; + store = store == null ? new LowRAMLoadingStore() : store; + this.processor = processor == null ? new LowRAMCSVProcessor(store, false, 5D) : processor; + Crypto.loadDefaultKey(); + } + + public static void main(String[] args) { + boolean rollUpVarNames = true; + if (args.length > 1) { + if (args[0].equalsIgnoreCase("NO_ROLLUP")) { + log.info("Configured to not roll up variable names"); + rollUpVarNames = false; + } + } + String inputDir = "/opt/local/hpds_input"; + LowRAMLoadingStore store = new LowRAMLoadingStore(); + LowRAMCSVProcessor lowRAMCSVProcessor = new LowRAMCSVProcessor(store, rollUpVarNames, 5D); + int exitCode = new LowRAMMultiCSVLoader(store, lowRAMCSVProcessor, inputDir).processCSVsFromHPDSDir(); + try { + store.saveStore(); + store.dumpStats(); + } catch (IOException | ClassNotFoundException e) { + log.error("Error saving store: ", e); + System.exit(1); + } + System.exit(exitCode); + } + + protected int processCSVsFromHPDSDir() { + // find all files + log.info("Looking for files to process. All files must be smaller than 5G"); + log.info("Files larger than 5G should be split into a series of CSVs"); + try (Stream input_files = Files.list(Path.of(inputDir))){ + input_files + .map(Path::toFile) + .filter(File::isFile) + .peek(f -> log.info("Found file {}", f.getAbsolutePath())) + + .filter(f -> f.getName().endsWith(".csv")) + .peek(f -> log.info("Confirmed file {} is a .csv", f.getAbsolutePath())) + + .map(processor::process) + .forEach(status -> log.info("Finished processing file {}", status)); + return 0; + } catch (IOException e) { + log.error("Exception processing files: ", e); + return 1; + } + } + + +} diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/PhenoRecord.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/sequential/PhenoRecord.java similarity index 97% rename from etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/PhenoRecord.java rename to etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/sequential/PhenoRecord.java index 9380a803..ad89dae5 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/PhenoRecord.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/sequential/PhenoRecord.java @@ -1,4 +1,4 @@ -package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype; +package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.sequential; import java.sql.ResultSet; import java.sql.SQLException; diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/SequentialLoader.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/sequential/SequentialLoader.java similarity index 68% rename from etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/SequentialLoader.java rename to etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/sequential/SequentialLoader.java index a2e970e9..6c37f000 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/SequentialLoader.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/sequential/SequentialLoader.java @@ -1,4 +1,4 @@ -package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype; +package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.sequential; import java.io.*; import java.nio.charset.Charset; @@ -8,6 +8,7 @@ import java.util.*; import java.util.concurrent.ExecutionException; +import edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.litecsv.LowRAMMultiCSVLoader; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVRecord; import org.apache.commons.io.IOUtils; @@ -45,6 +46,18 @@ public class SequentialLoader { private static long processedRecords = 0; + private static final int PATIENT_NUM = 0; + + private static final int CONCEPT_PATH = 1; + + private static final int NUMERIC_VALUE = 2; + + private static final int TEXT_VALUE = 3; + + private static final int DATETIME = 4; + + private static String HPDS_DIRECTORY = "/opt/local/hpds/"; + public static void main(String[] args) throws IOException, ClassNotFoundException { Crypto.loadDefaultKey(); @@ -71,6 +84,10 @@ public static void main(String[] args) throws IOException, ClassNotFoundExceptio } log.info("Input files: " + Arrays.deepToString(inputFiles.toArray())); + + if (inputFiles.stream().allMatch(f -> f.endsWith(".csv"))) { + LowRAMMultiCSVLoader.main(args); + } //load each into observation store for(String filename : inputFiles) { @@ -105,13 +122,18 @@ private static void loadCsvFile(String filename) throws IOException { //currentConcept is used to persist data across function calls and identify when we hit a new concept final PhenoCube[] currentConcept = new PhenoCube[1]; + int count = 0; for (CSVRecord record : records) { + count++; + if (count % 5000 == 0) { + log.info("Processed {} records for file {}", count, filename); + } if(record.size()<4) { log.warn("Record number " + record.getRecordNumber() + " had less records than we expected so we are skipping it."); continue; - } - processRecord(currentConcept, new PhenoRecord(record)); + } + processRecord(currentConcept, record); } reader.close(); in.close(); @@ -141,6 +163,63 @@ private static void loadTemplate() throws FileNotFoundException, IOException { } } + private static void processRecord(final PhenoCube[] currentConcept, CSVRecord record) { + if(record.size()<4) { + log.info("Record number " + record.getRecordNumber() + + " had less records than we expected so we are skipping it."); + return; + } + + try { + String conceptPathFromRow = record.get(CONCEPT_PATH); + String[] segments = conceptPathFromRow.split("\\\\"); + for(int x = 0;x4 && record.get(DATETIME) != null && ! record.get(DATETIME).isEmpty()) { + date = new Date(Long.parseLong(record.get(DATETIME))); + } + currentConcept[0].add(patientId, isAlpha ? value : Double.parseDouble(value), date); + store.allIds.add(patientId); + } + } catch (ExecutionException e) { + // todo: do we really want to ignore this? + log.error("Error processing record", e); + } + } + private static void processRecord(final PhenoCube[] currentConcept, PhenoRecord record) { if(record == null ) { return; diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/SequentialLoadingStore.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/sequential/SequentialLoadingStore.java similarity index 99% rename from etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/SequentialLoadingStore.java rename to etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/sequential/SequentialLoadingStore.java index c87b6434..370c6a78 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/SequentialLoadingStore.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/sequential/SequentialLoadingStore.java @@ -1,4 +1,4 @@ -package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype; +package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.sequential; import java.io.*; import java.util.*; diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/SQLLoader.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/sql/SQLLoader.java similarity index 96% rename from etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/SQLLoader.java rename to etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/sql/SQLLoader.java index 01860baa..59433470 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/SQLLoader.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/sql/SQLLoader.java @@ -1,24 +1,19 @@ -package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype; +package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.sql; import java.sql.ResultSet; import java.sql.SQLException; -import java.text.ParseException; import java.text.SimpleDateFormat; import java.io.*; import java.nio.charset.Charset; -import java.util.Date; import java.util.Properties; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import com.google.common.cache.CacheLoader.InvalidCacheLoadException; import edu.harvard.hms.dbmi.avillach.hpds.crypto.Crypto; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube; +import edu.harvard.hms.dbmi.avillach.hpds.etl.LoadingStore; import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/etl/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMCSVProcessorTest.java b/etl/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMCSVProcessorTest.java new file mode 100644 index 00000000..e9a5b8f4 --- /dev/null +++ b/etl/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMCSVProcessorTest.java @@ -0,0 +1,186 @@ +package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.litecsv; + + +import edu.harvard.hms.dbmi.avillach.hpds.crypto.Crypto; +import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.ColumnMeta; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.*; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.TreeMap; +import java.util.zip.GZIPInputStream; + +class LowRAMCSVProcessorTest { + + private static final String TEST_KEY_PATH = "src/test/resources/test_named_encryption_key"; + + + @Test + void shouldProcessSimpleCSV(@TempDir File testDir) throws IOException, ClassNotFoundException { + String content = """ + PATIENT_NUM,CONCEPT_PATH,NUMERIC_VALUE,TEXT_VALUE,DATETIME + 1,\\foo\\1\\,,val,1 + 1,\\foo\\2\\,0,,1 + 2,\\foo\\1\\,,lav,1 + 2,\\foo\\2\\,99,,1 + """; + Path csvPath = Path.of(testDir.getAbsolutePath(), "test.csv"); + Files.writeString(csvPath, content); + LowRAMLoadingStore store = new LowRAMLoadingStore( + testDir.getAbsolutePath() + "/allObservationsTemp.javabin", + testDir.getAbsolutePath() + "/columnMeta.javabin", + testDir.getAbsolutePath() + "/allObservationsStore.javabin", + "TEST_KEY" + ); + + IngestStatus status = new LowRAMCSVProcessor(store, false, 1D).process(csvPath.toFile()); + + Assertions.assertEquals(2, status.conceptCount()); + Assertions.assertEquals(5, status.lineCount()); + Assertions.assertEquals(csvPath, status.file()); + + Crypto.loadKey("TEST_KEY", TEST_KEY_PATH); + store.saveStore(); + List expectedMetas = List.of( + new CategoricalMeta("\\foo\\1\\", 2, 2, List.of("lav", "val")), + new NumericMeta("\\foo\\2\\", 2, 2, 0D, 99D) + ); + verifyStoreMeta(expectedMetas, testDir.getAbsolutePath() + "/columnMeta.javabin"); + } + + @Test + void shouldNotProcessCharAndInt(@TempDir File testDir) throws IOException, ClassNotFoundException { + String content = """ + PATIENT_NUM,CONCEPT_PATH,NUMERIC_VALUE,TEXT_VALUE,DATETIME + 1,\\foo\\1\\,,val,1 + 1,\\foo\\1\\,0,,1 + """; + Path csvPath = Path.of(testDir.getAbsolutePath(), "test.csv"); + Files.writeString(csvPath, content); + LowRAMLoadingStore store = new LowRAMLoadingStore( + testDir.getAbsolutePath() + "/allObservationsTemp.javabin", + testDir.getAbsolutePath() + "/columnMeta.javabin", + testDir.getAbsolutePath() + "/allObservationsStore.javabin", + "TEST_KEY" + ); + + IngestStatus status = new LowRAMCSVProcessor(store, false, 1D).process(csvPath.toFile()); + + Assertions.assertEquals(1, status.conceptCount()); + Assertions.assertEquals(3, status.lineCount()); + Assertions.assertEquals(csvPath, status.file()); + + Crypto.loadKey("TEST_KEY", TEST_KEY_PATH); + store.saveStore(); + List expectedMetas = List.of( + new CategoricalMeta("\\foo\\1\\", 1, 1, List.of("val")) // make sure "1" doesn't show up + ); + verifyStoreMeta(expectedMetas, testDir.getAbsolutePath() + "/columnMeta.javabin"); + } + + @Test + void shouldNotProcessIntAndChar(@TempDir File testDir) throws IOException, ClassNotFoundException { + String content = """ + PATIENT_NUM,CONCEPT_PATH,NUMERIC_VALUE,TEXT_VALUE,DATETIME + 1,\\foo\\1\\,0,,1 + 2,\\foo\\1\\,1,,1 + 1,\\foo\\1\\,,val,1 + """; + Path csvPath = Path.of(testDir.getAbsolutePath(), "test.csv"); + Files.writeString(csvPath, content); + LowRAMLoadingStore store = new LowRAMLoadingStore( + testDir.getAbsolutePath() + "/allObservationsTemp.javabin", + testDir.getAbsolutePath() + "/columnMeta.javabin", + testDir.getAbsolutePath() + "/allObservationsStore.javabin", + "TEST_KEY" + ); + + IngestStatus status = new LowRAMCSVProcessor(store, false, 1D).process(csvPath.toFile()); + + Assertions.assertEquals(1, status.conceptCount()); + Assertions.assertEquals(4, status.lineCount()); + Assertions.assertEquals(csvPath, status.file()); + + Crypto.loadKey("TEST_KEY", TEST_KEY_PATH); + store.saveStore(); + List expectedMetas = List.of( + new NumericMeta("\\foo\\1\\", 2, 2, 0.0, 1.0) // make sure "1" doesn't show up + ); + verifyStoreMeta(expectedMetas, testDir.getAbsolutePath() + "/columnMeta.javabin"); + } + + @Test + void shouldProcessLargeFile(@TempDir File testDir) throws IOException, ClassNotFoundException { + Path csvPath = Path.of(testDir.getAbsolutePath(), "test.csv"); + + // Create a ~10G file sorted by patient + String header = "PATIENT_NUM,CONCEPT_PATH,NUMERIC_VALUE,TEXT_VALUE,DATETIME\n"; + Files.writeString(csvPath, header); + // 5 char patient col + 22 char concept col + 1 char numeric value + 10 char date + 4 commas + newline = + // 43 chars of ascii-utf8 = 43 bytes per line, except the debugger says 44 so uh 44 it is + // I want a file that splits into 2 chunks. The chunk size is set to 0.1 G, so 2 * (0.1 * 1024^3)/44 + // So we need 4880644 lines of this to make a 0.2G file that splits into 2 chunks + // ... give or take. I shot a little under + try ( + FileWriter fw = new FileWriter(csvPath.toString(), true); + BufferedWriter writer = new BufferedWriter(fw) + ) { + for (int line = 0; line < 4880000; line++) { + int patient = line / 100; int concept = line % 100; int val = line % 9; String date = "1739329199"; + String row = String.format("%05d", patient) + ",\\my\\concept\\path\\" + String.format("%05d", concept) + "\\," + val + ",," + date + '\n'; + writer.write(row); + if(line % 1000000 == 0) { + System.out.println("Wrote line: " + line); + } + } + } + + LowRAMLoadingStore store = new LowRAMLoadingStore( + testDir.getAbsolutePath() + "/allObservationsTemp.javabin", + testDir.getAbsolutePath() + "/columnMeta.javabin", + testDir.getAbsolutePath() + "/allObservationsStore.javabin", + "TEST_KEY" + ); + + IngestStatus status = new LowRAMCSVProcessor(store, false, 0.1).process(csvPath.toFile()); + Assertions.assertEquals(4880001, status.lineCount()); + Assertions.assertEquals(100, status.conceptCount()); + Assertions.assertEquals(csvPath, status.file()); + + Crypto.loadKey("TEST_KEY", TEST_KEY_PATH); + store.saveStore(); + List expectedMetas = List.of( + new NumericMeta("\\my\\concept\\path\\00000\\", 24399, 24399, 0.0, 8.0), + new NumericMeta("\\my\\concept\\path\\00099\\", 24400, 24400, 0.0, 8.0) + ); + verifyStoreMeta(expectedMetas, testDir.getAbsolutePath() + "/columnMeta.javabin"); + } + + private void verifyStoreMeta(List expectedMetas, String columnMetaPath) throws IOException, ClassNotFoundException { + ObjectInputStream objectInputStream = new ObjectInputStream(new GZIPInputStream(new FileInputStream(columnMetaPath))); + TreeMap metaStore = (TreeMap) objectInputStream.readObject(); + for (Meta expectedMeta : expectedMetas) { + ColumnMeta actualMeta = metaStore.get(expectedMeta.key()); + Assertions.assertNotNull(actualMeta); + Assertions.assertEquals(expectedMeta.patientCount(), actualMeta.getPatientCount()); + Assertions.assertEquals(expectedMeta.conceptCount(), actualMeta.getObservationCount()); + if (expectedMeta instanceof NumericMeta expectedNumeric) { + Assertions.assertEquals(expectedNumeric.min(), actualMeta.getMin()); + Assertions.assertEquals(expectedNumeric.max(), actualMeta.getMax()); + } else if (expectedMeta instanceof CategoricalMeta expectedCategorical) { + Assertions.assertEquals(expectedCategorical.values(), actualMeta.getCategoryValues()); + } + } + } + private sealed interface Meta permits NumericMeta, CategoricalMeta { + String key(); + int patientCount(); + int conceptCount(); + } + private record NumericMeta(String key, int patientCount, int conceptCount, Double min, Double max) implements Meta {} + private record CategoricalMeta(String key, int patientCount, int conceptCount, List values) implements Meta {} +} \ No newline at end of file diff --git a/etl/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMMultiCSVLoaderTest.java b/etl/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMMultiCSVLoaderTest.java new file mode 100644 index 00000000..200c214e --- /dev/null +++ b/etl/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/litecsv/LowRAMMultiCSVLoaderTest.java @@ -0,0 +1,48 @@ +package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.litecsv; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.file.Path; + +class LowRAMMultiCSVLoaderTest { + + @Test + void shouldFilterOutNonCSVs(@TempDir File testDir) throws IOException { + Path csvPath = Path.of(testDir.getAbsolutePath(), "test.txt"); + RandomAccessFile largeFile = new RandomAccessFile(csvPath.toString(), "rw"); + largeFile.setLength(6L*1024); + + LowRAMCSVProcessor processor = Mockito.mock(LowRAMCSVProcessor.class); + LowRAMLoadingStore store = Mockito.mock(LowRAMLoadingStore.class); + + LowRAMMultiCSVLoader subject = new LowRAMMultiCSVLoader(store, processor, testDir.getAbsolutePath()); + int actual = subject.processCSVsFromHPDSDir(); + + Assertions.assertEquals(0, actual); + Mockito.verify(processor, Mockito.times(0)).process(Mockito.any()); + } + + @Test + void shouldProcessSmallCSVs(@TempDir File testDir) throws IOException { + Path csvPath = Path.of(testDir.getAbsolutePath(), "test.csv"); + RandomAccessFile largeFile = new RandomAccessFile(csvPath.toString(), "rw"); + largeFile.setLength(6L*1024); + + LowRAMCSVProcessor processor = Mockito.mock(LowRAMCSVProcessor.class); + Mockito.when(processor.process(Mockito.any())) + .thenReturn(new IngestStatus(csvPath, 10, 10, 10L)); + LowRAMLoadingStore store = Mockito.mock(LowRAMLoadingStore.class); + + LowRAMMultiCSVLoader subject = new LowRAMMultiCSVLoader(store, processor, testDir.getAbsolutePath()); + int actual = subject.processCSVsFromHPDSDir(); + + Assertions.assertEquals(0, actual); + Mockito.verify(processor, Mockito.times(1)).process(Mockito.any()); + } +} \ No newline at end of file diff --git a/etl/src/test/resources/test_named_encryption_key b/etl/src/test/resources/test_named_encryption_key new file mode 100644 index 00000000..d3720b6d --- /dev/null +++ b/etl/src/test/resources/test_named_encryption_key @@ -0,0 +1 @@ +1111111111111111 \ No newline at end of file diff --git a/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/test/util/BuildIntegrationTestEnvironment.java b/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/test/util/BuildIntegrationTestEnvironment.java index 4b91cece..71a75424 100644 --- a/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/test/util/BuildIntegrationTestEnvironment.java +++ b/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/test/util/BuildIntegrationTestEnvironment.java @@ -4,7 +4,7 @@ import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantStore; import edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.NewVCFLoader; import edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.VariantMetadataLoader; -import edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.CSVLoader; +import edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.csv.CSVLoader; import java.io.IOException;