Skip to content

Commit

Permalink
[ALS-7113] Enhance sequential loader
Browse files Browse the repository at this point in the history
  • Loading branch information
Luke Sikina committed Feb 13, 2025
1 parent d6f1ec4 commit 40cb814
Show file tree
Hide file tree
Showing 11 changed files with 776 additions and 44 deletions.
2 changes: 1 addition & 1 deletion etl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@
<configuration>
<archive>
<manifest>
<mainClass>edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.SequentialLoader</mainClass>
<mainClass>edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.seqcsv.SmallFileSequentialCSVLoader</mainClass>
</manifest>
</archive>
<outputDirectory>${project.basedir}/../docker/pic-sure-hpds-etl</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,6 @@ public class CSVLoaderNewSearch {

private static final Logger log = LoggerFactory.getLogger(CSVLoaderNewSearch.class);

private static final int PATIENT_NUM = 0;

private static final int CONCEPT_PATH = 1;

private static final int NUMERIC_VALUE = 2;

private static final int TEXT_VALUE = 3;

private static final int DATETIME = 4;

private static boolean DO_VARNAME_ROLLUP = false;

private static final String HPDS_DIRECTORY = "/opt/local/hpds/";
Expand Down Expand Up @@ -56,24 +46,24 @@ private static void initialLoad() throws IOException {

private static void processRecord(final PhenoCube[] currentConcept, CSVRecord record) {
if (record.size() < 4) {
log.info("Record number {} had less records than we expected so we are skipping it.", record.getRecordNumber());
log.info("Record number {} had less records than we exgpected so we are skipping it.", record.getRecordNumber());
return;
}

String conceptPath = getSanitizedConceptPath(record);
String numericValue = record.get(NUMERIC_VALUE);
String conceptPath = CSVParserUtil.parseConceptPath(record, DO_VARNAME_ROLLUP);
String numericValue = record.get(CSVParserUtil.NUMERIC_VALUE);
boolean isAlpha = (numericValue == null || numericValue.isEmpty());
String value = isAlpha ? record.get(TEXT_VALUE) : numericValue;
String value = isAlpha ? record.get(CSVParserUtil.TEXT_VALUE) : numericValue;
currentConcept[0] = getPhenoCube(currentConcept[0], conceptPath, isAlpha);

if (value != null && !value.trim().isEmpty() &&
((isAlpha && currentConcept[0].vType == String.class) || (!isAlpha && currentConcept[0].vType == Double.class))) {
value = value.trim();
currentConcept[0].setColumnWidth(isAlpha ? Math.max(currentConcept[0].getColumnWidth(), value.getBytes().length) : Double.BYTES);
int patientId = Integer.parseInt(record.get(PATIENT_NUM));
int patientId = Integer.parseInt(record.get(CSVParserUtil.PATIENT_NUM));
Date date = null;
if (record.size() > 4 && record.get(DATETIME) != null && !record.get(DATETIME).isEmpty()) {
date = new Date(Long.parseLong(record.get(DATETIME)));
if (record.size() > 4 && record.get(CSVParserUtil.DATETIME) != null && !record.get(CSVParserUtil.DATETIME).isEmpty()) {
date = new Date(Long.parseLong(record.get(CSVParserUtil.DATETIME)));
}
currentConcept[0].add(patientId, isAlpha ? value : Double.parseDouble(value), date);
store.allIds.add(patientId);
Expand All @@ -91,28 +81,4 @@ private static PhenoCube getPhenoCube(PhenoCube currentConcept, String conceptPa

return currentConcept;
}

private static String getSanitizedConceptPath(CSVRecord record) {
String conceptPathFromRow = record.get(CONCEPT_PATH);
String[] segments = conceptPathFromRow.split("\\\\");
for (int x = 0; x < segments.length; x++) {
segments[x] = segments[x].trim();
}
conceptPathFromRow = String.join("\\", segments) + "\\";
conceptPathFromRow = conceptPathFromRow.replaceAll("\\ufffd", "");
String textValueFromRow = record.get(TEXT_VALUE) == null ? null : record.get(TEXT_VALUE).trim();
if (textValueFromRow != null) {
textValueFromRow = textValueFromRow.replaceAll("\\ufffd", "");
}
String conceptPath;

if (DO_VARNAME_ROLLUP) {
conceptPath = conceptPathFromRow.endsWith("\\" + textValueFromRow + "\\") ? conceptPathFromRow.replaceAll("\\\\[^\\\\]*\\\\$", "\\\\") : conceptPathFromRow;
} else {
conceptPath = conceptPathFromRow;
}
return conceptPath;
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype;

import org.apache.commons.csv.CSVRecord;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Date;
import java.util.stream.Collectors;

public class CSVParserUtil {
public static final int PATIENT_NUM = 0;
public static final int CONCEPT_PATH = 1;
public static final int NUMERIC_VALUE = 2;
public static final int TEXT_VALUE = 3;
public static final int DATETIME = 4;

public static String parseConceptPath(CSVRecord record, boolean doVarNameRollup) {
String conceptPathFromRow = record.get(CONCEPT_PATH);
conceptPathFromRow = Arrays.stream(conceptPathFromRow.split("\\\\"))
.map(String::trim)
.collect(Collectors.joining("\\")) + "\\";
conceptPathFromRow = stripWeirdUnicodeChars(conceptPathFromRow);

// \\ufffd = �
String textValueFromRow = stripWeirdUnicodeChars(trim(record.get(TEXT_VALUE)));
if (doVarNameRollup && conceptPathFromRow.endsWith("\\" + textValueFromRow + "\\")) {
// This regex deletes the last node from the concept path, i.e. "rolling it up"
return conceptPathFromRow.replaceAll("\\\\[^\\\\]*\\\\$", "\\\\");
} else {
return conceptPathFromRow;
}
}

private static String stripWeirdUnicodeChars(@Nonnull String raw) {
return raw.replaceAll("\\ufffd", "");
}

public static String trim(@Nullable String maybeString) {
return maybeString == null ? "" : maybeString.trim();
}

public static @Nullable Date parseDate(CSVRecord record) {
Date date = null;
try {
if (record.size() > 4 && record.get(DATETIME) != null && !record.get(DATETIME).isEmpty()) {
date = new Date(Long.parseLong(record.get(DATETIME)));
}
} catch (NumberFormatException e) {
return null;
}

return date;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ public class SequentialLoader {

private static long processedRecords = 0;

private static final int PATIENT_NUM = 0;

private static final int CONCEPT_PATH = 1;

private static final int NUMERIC_VALUE = 2;

private static final int TEXT_VALUE = 3;

private static final int DATETIME = 4;

private static String HPDS_DIRECTORY = "/opt/local/hpds/";

public static void main(String[] args) throws IOException, ClassNotFoundException {

Crypto.loadDefaultKey();
Expand Down Expand Up @@ -105,13 +117,18 @@ private static void loadCsvFile(String filename) throws IOException {

//currentConcept is used to persist data across function calls and identify when we hit a new concept
final PhenoCube[] currentConcept = new PhenoCube[1];
int count = 0;
for (CSVRecord record : records) {
count++;
if (count % 5000 == 0) {
log.info("Processed {} records for file {}", count, filename);
}
if(record.size()<4) {
log.warn("Record number " + record.getRecordNumber()
+ " had less records than we expected so we are skipping it.");
continue;
}
processRecord(currentConcept, new PhenoRecord(record));
}
processRecord(currentConcept, record);
}
reader.close();
in.close();
Expand Down Expand Up @@ -141,6 +158,63 @@ private static void loadTemplate() throws FileNotFoundException, IOException {
}
}

private static void processRecord(final PhenoCube[] currentConcept, CSVRecord record) {
if(record.size()<4) {
log.info("Record number " + record.getRecordNumber()
+ " had less records than we expected so we are skipping it.");
return;
}

try {
String conceptPathFromRow = record.get(CONCEPT_PATH);
String[] segments = conceptPathFromRow.split("\\\\");
for(int x = 0;x<segments.length;x++) {
segments[x] = segments[x].trim();
}
conceptPathFromRow = String.join("\\", segments) + "\\";
conceptPathFromRow = conceptPathFromRow.replaceAll("\\ufffd", "");
String textValueFromRow = record.get(TEXT_VALUE) == null ? null : record.get(TEXT_VALUE).trim();
if(textValueFromRow!=null) {
textValueFromRow = textValueFromRow.replaceAll("\\ufffd", "");
}
String conceptPath = conceptPathFromRow.endsWith("\\" +textValueFromRow+"\\") ? conceptPathFromRow.replaceAll("\\\\[^\\\\]*\\\\$", "\\\\") : conceptPathFromRow;
// This is not getDouble because we need to handle null values, not coerce them into 0s
String numericValue = record.get(NUMERIC_VALUE);
if((numericValue==null || numericValue.isEmpty()) && textValueFromRow!=null) {
try {
numericValue = Double.parseDouble(textValueFromRow) + "";
}catch(NumberFormatException e) {

}
}
boolean isAlpha = (numericValue == null || numericValue.isEmpty());
if(currentConcept[0] == null || !currentConcept[0].name.equals(conceptPath)) {
try {
currentConcept[0] = store.loadingCache.get(conceptPath);
} catch(InvalidCacheLoadException e) {
currentConcept[0] = new PhenoCube(conceptPath, isAlpha ? String.class : Double.class);
store.loadingCache.put(conceptPath, currentConcept[0]);
}
}
String value = isAlpha ? record.get(TEXT_VALUE) : numericValue;

if(value != null && !value.trim().isEmpty() && ((isAlpha && currentConcept[0].vType == String.class)||(!isAlpha && currentConcept[0].vType == Double.class))) {
value = value.trim();
currentConcept[0].setColumnWidth(isAlpha ? Math.max(currentConcept[0].getColumnWidth(), value.getBytes().length) : Double.BYTES);
int patientId = Integer.parseInt(record.get(PATIENT_NUM));
Date date = null;
if(record.size()>4 && record.get(DATETIME) != null && ! record.get(DATETIME).isEmpty()) {
date = new Date(Long.parseLong(record.get(DATETIME)));
}
currentConcept[0].add(patientId, isAlpha ? value : Double.parseDouble(value), date);
store.allIds.add(patientId);
}
} catch (ExecutionException e) {
// todo: do we really want to ignore this?
log.error("Error processing record", e);
}
}

private static void processRecord(final PhenoCube[] currentConcept, PhenoRecord record) {
if(record == null ) {
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.seqcsv;

import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube;
import edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.CSVParserUtil;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.apache.logging.log4j.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.io.*;
import java.nio.file.Files;
import java.util.*;
import java.util.stream.Stream;

public class CSVProcessor {

private static final Logger log = LoggerFactory.getLogger(CSVProcessor.class);
private final LowRAMLoadingStore store;
private final boolean doVarNameRollup;
private final double maxChunkSizeGigs;

public CSVProcessor(LowRAMLoadingStore store, boolean doVarNameRollup, double maxChunkSizeGigs) {
this.store = store;
this.doVarNameRollup = doVarNameRollup;
this.maxChunkSizeGigs = maxChunkSizeGigs;
}

public IngestStatus process(File csv) {
long startTime = System.nanoTime();
log.info("Attempting to ingest file {}", csv.getAbsolutePath());
try (Reader r = new FileReader(csv); Stream<String> rawLines = Files.lines(csv.toPath())) {
CSVParser parser = CSVFormat.DEFAULT
.withSkipHeaderRecord()
.withFirstRecordAsHeader()
.parse(new BufferedReader(r));

// we want to read the file in reasonably sized chunks so that we can handle chunks naively
// in memory without going OOM
// to do this, we're going to assume that over the course of thousands of lines, each line
// is more or less the same length
// so we'll just provision n chunks, where n is max(1, file_size/5G)
log.info("Gathering stats about file {}", csv.getName());
int chunks = Math.max(1, (int)Math.ceil((double) csv.length() / (maxChunkSizeGigs*1024*1024*1024)));
final long totalLineCount = rawLines.count();
final long linesPerChunk = totalLineCount / chunks;
log.info(
"File {} is {} bytes and {} lines. Dividing into {} chunks of {} lines each",
csv.getName(), csv.length(), totalLineCount, chunks, linesPerChunk
);
long chunkLineCount = 0;
long lineCount = 0;
int chunkCount = 0;
Set<String> concepts = new HashSet<>();
List<CSVRecord> lines = new ArrayList<>();

log.info("Creating chunks");
for (CSVRecord record : parser) {
chunkLineCount++;
lineCount++;
lines.add(record);
if (chunkLineCount > linesPerChunk || lineCount + 1 == totalLineCount ) {
log.info("Finished creating chunk {}", chunkCount);
// sort by concept to prevent cache thrashing when ingesting
// loading each concept in its entirety for a chunk will minimize disk IO and
// let us keep more valuable things in RAM
lines.sort(Comparator.comparing(a -> a.get(1)));
log.info("Finished sorting chunk {}", chunkCount);
Set<String> chunkConcepts = ingest(lines);
concepts.addAll(chunkConcepts);
log.info("Finished ingesting chunk {} with {} unique concepts", chunkCount, chunkConcepts.size());
lines = new ArrayList<>();
chunkLineCount = 0;
chunkCount++;
}
}

return new IngestStatus(
csv.toPath(),
totalLineCount,
concepts.size(),
System.nanoTime() - startTime
);

} catch (IOException e) {
throw new RuntimeException(e);
}
}


private Set<String> ingest(List<CSVRecord> sortedRecords) {
Set<String> concepts = new HashSet<>();
for (CSVRecord record : sortedRecords) {
if (record.size() < 4) {
log.info("Record #{} has too few columns, skipping.", record.getRecordNumber());
continue;
}

String conceptPath = CSVParserUtil.parseConceptPath(record, doVarNameRollup);
IngestFn ingestFn = Strings.isEmpty(record.get(CSVParserUtil.NUMERIC_VALUE)) ? this::ingestNonNumeric : this::ingestNumeric;
Date date = CSVParserUtil.parseDate(record);
int patientId = Integer.parseInt(record.get(CSVParserUtil.PATIENT_NUM));
if (ingestFn.attemptIngest(record, conceptPath, patientId, date)) {
concepts.add(conceptPath);
} else {
log.warn("Could not ingest record #{}", record.getRecordNumber());
}
}
return concepts;
}

@FunctionalInterface
private interface IngestFn {
boolean attemptIngest(CSVRecord record, String path, int patientId, @Nullable Date date);
}

private boolean ingestNumeric(CSVRecord record, String conceptPath, int patientId, Date date) {
PhenoCube<Double> concept = store.loadingCache.getIfPresent(conceptPath);
if (concept == null) {
concept = new PhenoCube<>(conceptPath, Double.class);
concept.setColumnWidth(Double.BYTES);
store.loadingCache.put(conceptPath, concept);
}
try {
String rawNumericValue = CSVParserUtil.trim(record.get(CSVParserUtil.NUMERIC_VALUE));
double parsedValue = Double.parseDouble(rawNumericValue);
concept.add(patientId, parsedValue, date);
return true;
} catch (NumberFormatException e) {
log.warn("Could not parse numeric value in line {}", record);
}
return false;
}

private boolean ingestNonNumeric(CSVRecord record, String conceptPath, int patientId, Date date) {
PhenoCube<String> concept = store.loadingCache.getIfPresent(conceptPath);
if (concept == null) {
concept = new PhenoCube<>(conceptPath, String.class);
store.loadingCache.put(conceptPath, concept);
}
String rawTextValue = CSVParserUtil.trim(record.get(CSVParserUtil.TEXT_VALUE));
if (rawTextValue.isEmpty()) {
return false;
}
concept.setColumnWidth(Math.max(rawTextValue.getBytes().length, concept.getColumnWidth()));
concept.add(patientId, rawTextValue, date);
return true;
}
}
Loading

0 comments on commit 40cb814

Please sign in to comment.