Skip to content

Commit

Permalink
Improve KING/plink2 implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
bbimber committed Feb 24, 2025
1 parent 02e696e commit cc27375
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
import org.labkey.api.pipeline.PipelineJobException;
import org.labkey.api.pipeline.PipelineJobService;
import org.labkey.api.sequenceanalysis.run.AbstractCommandWrapper;

import java.io.File;
import java.util.ArrayList;
import java.util.List;

/**
* User: bimber
Expand All @@ -22,4 +26,33 @@ public static File getBcfToolsPath()
{
return SequencePipelineService.get().getExeForPackage("BCFTOOLSPATH", "bcftools");
}

public static boolean isBcftoolsFound()
{
return BcftoolsRunner.resolveFileInPath("bcftools", null, false) != null;
}

public void doIndex(File vcf) throws PipelineJobException
{
List<String> args = new ArrayList<>();
args.add(getBcfToolsPath().getAbsolutePath());
args.add("index");
args.add("-t");
args.add("-f");
args.add("-n");

if (!PipelineJobService.get().isWebServer())
{
Integer threads = SequencePipelineService.get().getMaxThreads(getLogger());
if (threads != null)
{
args.add("--threads");
args.add(String.valueOf(threads));
}
}

args.add(vcf.getAbsolutePath());

execute(args);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.labkey.api.sequenceanalysis.SequenceAnalysisService;
import org.labkey.api.sequenceanalysis.SequenceDataProvider;
import org.labkey.api.sequenceanalysis.model.Readset;
import org.labkey.api.sequenceanalysis.pipeline.BcftoolsRunner;
import org.labkey.api.sequenceanalysis.pipeline.ReferenceGenome;
import org.labkey.api.sequenceanalysis.pipeline.SamtoolsCramConverter;
import org.labkey.api.sequenceanalysis.pipeline.SequenceOutputHandler;
Expand Down Expand Up @@ -267,8 +268,16 @@ public File ensureVcfIndex(File vcf, Logger log, boolean forceRecreate) throws I
//note: there is a bug in htsjdk's index creation with gz inputs
if (gz.isType(vcf) && !SystemUtils.IS_OS_WINDOWS)
{
TabixRunner r = new TabixRunner(log);
r.execute(vcf);
// preferentially use bcftools since it supports multithreading:
if (BcftoolsRunner.isBcftoolsFound())
{
new BcftoolsRunner(log).doIndex(vcf);
}
else
{
new TabixRunner(log).execute(vcf);
}

if (!expectedIdx.exists())
{
throw new PipelineJobException("Expected index was not created: " + expectedIdx.getPath());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.labkey.sequenceanalysis.run.variant;

import com.google.common.io.Files;
import htsjdk.samtools.SAMSequenceDictionary;
import htsjdk.samtools.SAMSequenceRecord;
import htsjdk.samtools.util.Interval;
Expand All @@ -10,10 +11,7 @@
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
import org.json.JSONObject;
import org.labkey.api.collections.CaseInsensitiveHashMap;
import org.labkey.api.pipeline.PipelineJobException;
import org.labkey.api.reader.Readers;
import org.labkey.api.sequenceanalysis.SequenceAnalysisService;
import org.labkey.api.sequenceanalysis.pipeline.AbstractVariantProcessingStepProvider;
import org.labkey.api.sequenceanalysis.pipeline.PedigreeToolParameterDescriptor;
import org.labkey.api.sequenceanalysis.pipeline.PipelineContext;
Expand All @@ -26,18 +24,11 @@
import org.labkey.api.sequenceanalysis.run.AbstractCommandPipelineStep;
import org.labkey.api.sequenceanalysis.run.AbstractCommandWrapper;
import org.labkey.api.util.Compress;
import org.labkey.api.writer.PrintWriters;
import org.labkey.sequenceanalysis.pipeline.ProcessVariantsHandler;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class KingInferenceStep extends AbstractCommandPipelineStep<KingInferenceStep.KingWrapper> implements VariantProcessingStep
{
Expand All @@ -50,7 +41,7 @@ public static class Provider extends AbstractVariantProcessingStepProvider<KingI
{
public Provider()
{
super("KingInferenceStep", "KING/Relatedness", "", "This will run KING to infer kinship from a VCF", List.of(
super("KingInferenceStep", "KING/Relatedness", "", "This will run KING (via plink2) to infer kinship from a VCF", List.of(
ToolParameterDescriptor.create("limitToChromosomes", "Limit to Chromosomes", "If checked, the analysis will include only the primary chromosomes", "checkbox", new JSONObject()
{{
put("checked", true);
Expand Down Expand Up @@ -148,7 +139,24 @@ public Output processVariants(File inputVCF, File outputDirectory, ReferenceGeno
plinkArgs1.add("--out");
plinkArgs1.add(plinkOut.getPath());

plink.execute(plinkArgs1);
File doneFile = new File (plinkOut.getPath() + ".done");
output.addIntermediateFile(doneFile);
if (doneFile.exists())
{
getPipelineCtx().getLogger().debug("plink has completed, will not repeat");
}
else {
plink.execute(plinkArgs1);

try
{
Files.touch(doneFile);
}
catch (IOException e)
{
throw new PipelineJobException(e);
}
}

File plinkOutBed = new File(plinkOut.getPath() + ".bed");
if (!plinkOutBed.exists())
Expand All @@ -163,7 +171,23 @@ public Output processVariants(File inputVCF, File outputDirectory, ReferenceGeno
plinkArgs2.add("--out");
plinkArgs2.add(plinkOutKing.getPath());

plink.execute(plinkArgs2);
doneFile = new File (plinkOutKing.getPath() + ".done");
if (doneFile.exists())
{
getPipelineCtx().getLogger().debug("plink has completed, will not repeat");
}
else {
plink.execute(plinkArgs2);

try
{
Files.touch(doneFile);
}
catch (IOException e)
{
throw new PipelineJobException(e);
}
}

File plinkOutKingFile = new File(plinkOutKing.getPath() + ".kin0");
if (!plinkOutKingFile.exists())
Expand All @@ -188,131 +212,11 @@ public Output processVariants(File inputVCF, File outputDirectory, ReferenceGeno
throw new PipelineJobException(e);
}

output.addSequenceOutput(plinkOutKingFileTxt, "PLINK2 Relatedness: " + inputVCF.getName(), "PLINK2 Kinship", null, null, genome.getGenomeId(), "Total lines: " + lineCount);

// Also with KING:
KingWrapper wrapper = new KingWrapper(getPipelineCtx().getLogger());
wrapper.setWorkingDir(outputDirectory);

List<String> kingArgs = new ArrayList<>();
kingArgs.add(wrapper.getExe().getPath());

kingArgs.add("-b");
kingArgs.add(plinkOutBed.getPath());

kingArgs.add("--prefix");
kingArgs.add(SequenceAnalysisService.get().getUnzippedBaseName(inputVCF.getName()));

// Update the pedigree / fam file:
String demographicsProviderName = getProvider().getParameterByName(PedigreeToolParameterDescriptor.NAME).extractValue(getPipelineCtx().getJob(), getProvider(), getStepIdx());
if (demographicsProviderName != null)
{
File pedFile = ProcessVariantsHandler.getPedigreeFile(getPipelineCtx().getSourceDirectory(true), demographicsProviderName);
if (!pedFile.exists())
{
throw new PipelineJobException("Unable to find pedigree file: " + pedFile.getPath());
}

File kingFam = createFamFile(pedFile, new File(plinkOutBed.getParentFile(), "plink.fam"));
kingArgs.add("--fam");
kingArgs.add(kingFam.getPath());

output.addIntermediateFile(kingFam);
}

if (threads != null)
{
kingArgs.add("--cpus");
kingArgs.add(threads.toString());
}

kingArgs.add("--kinship");
kingArgs.add("--rplot");

File kinshipOutput = new File(outputDirectory, SequenceAnalysisService.get().getUnzippedBaseName(inputVCF.getName()) + ".kin");
wrapper.execute(kingArgs);
if (!kinshipOutput.exists())
{
throw new PipelineJobException("Unable to find file: " + kinshipOutput.getPath());
}

File kinshipOutputTxt = new File(kinshipOutput.getPath() + ".txt.gz");
if (kinshipOutputTxt.exists())
{
kinshipOutputTxt.delete();
}

lineCount = SequencePipelineService.get().getLineCount(kinshipOutput)-1;
try
{
Compress.compressGzip(kinshipOutput, kinshipOutputTxt);
FileUtils.delete(kinshipOutput);
}
catch (IOException e)
{
throw new PipelineJobException(e);
}

output.addSequenceOutput(kinshipOutputTxt, "King Relatedness: " + inputVCF.getName(), "KING Relatedness", null, null, genome.getGenomeId(), "Total lines: " + lineCount);
output.addSequenceOutput(plinkOutKingFileTxt, "PLINK2/KING Relatedness: " + inputVCF.getName(), "PLINK2/KING Kinship", null, null, genome.getGenomeId(), "Total lines: " + lineCount);

return output;
}

private File createFamFile(File pedFile, File famFile) throws PipelineJobException
{
File newFamFile = new File(famFile.getParentFile(), "king.fam");

Map<String, String> pedMap = new CaseInsensitiveHashMap<>();
try (BufferedReader reader = Readers.getReader(pedFile))
{
String line;
while ((line = reader.readLine()) != null)
{
String[] tokens = line.split(" ");
if (tokens.length != 6)
{
throw new PipelineJobException("Improper ped line length: " + tokens.length);
}

pedMap.put(tokens[1], StringUtils.join(Arrays.asList("0", tokens[1], tokens[2], tokens[3], tokens[4], "-9"), "\t"));
}
}
catch (IOException e)
{
throw new PipelineJobException(e);
}

try (BufferedReader reader = Readers.getReader(famFile);PrintWriter writer = PrintWriters.getPrintWriter(newFamFile))
{
String line;
while ((line = reader.readLine()) != null)
{
String[] tokens = line.split("\t");
if (tokens.length != 6)
{
throw new PipelineJobException("Improper ped line length: " + tokens.length);
}

String newRow = pedMap.get(tokens[1]);
if (newRow == null)
{
getPipelineCtx().getLogger().warn("Unable to find pedigree entry for: " + tokens[1] + ", reusing original");
writer.println(line);
}
else
{
writer.println(newRow);
}
}
}
catch (IOException e)
{
throw new PipelineJobException(e);
}

return newFamFile;
}

public static class KingWrapper extends AbstractCommandWrapper
{
public KingWrapper(@Nullable Logger logger)
Expand Down

0 comments on commit cc27375

Please sign in to comment.