Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
flowcellFastqDir
flowcellBamDir
calls:
FastqPairToBwaBamFlow
supplies:
bamFile
fastq1File
fastq2File
"""
#
# 1. separate fastqs into matching pairs:
#
fqs = {}
fqDigger = FileDigger(".fastq.gz", ["Project_", "Sample_"])
for (project, sample, fqPath) in fqDigger.getNextFile(self.params.flowcellFastqDir) :
if (self.params.sampleNameList != None) and \
(len(self.params.sampleNameList) != 0) and \
(sample not in self.params.sampleNameList) : continue
fqFile = os.path.basename(fqPath)
w = (fqFile.split(".")[0]).split("_")
if len(w) != 5 :
raise Exception("Unexpected fastq filename format: '%s'" % (fqPath))
(sample2, index, lane, read, num) = w
if sample != sample2 :
raise Exception("Fastq name sample disagrees with directory sample: '%s;" % (fqPath))
key = (project, sample, index, lane, num)
if key not in fqs : fqs[key] = [None, None]
self.params.mergeBamList = bams[key]
self.params.mergeBamName = os.path.join(mergedSampleDir, sample + mergedBamExt)
mergedBams[key] = self.params.mergeBamName
outTaskPrefix = preJoin(taskPrefix, "_".join(key))
sampleTasks[key] = mergeBamListFlow(self, outTaskPrefix, dependencies)
if not os.path.isdir(mergedBamDir) : return
#
# 3) mark dup:
#
# mergedBams contains all bams from the current run, we also add any from a
# previous interupted run:
mergedBamDigger = FileDigger(mergedBamExt, ["Project_", "Sample_"])
for (project, sample, bamFile) in mergedBamDigger.getNextFile(mergedBamDir) :
key = (project, sample)
if key in mergedBams :
assert (mergedBams[key] == bamFile)
else :
mergedBams[key] = bamFile
nextWait = set()
totalCores = self.getNCores()
for sampleKey in mergedBams.keys() :
markDupDep = set()
if sampleKey in sampleTasks : markDupDep = sampleTasks[sampleKey]
fullName = "_".join(sampleKey)
supplies:
mergeBamList
mergeBamName
"""
#
# 1) get a list of bams associated with each project/sample combination:
#
# TODO: what if there's an NFS delay updating all the bams while
# we're reading them out here? make this process more robust -- we
# should know how many BAM's we're expecting, in a way that's
# robust to interuption/restart
#
bams = {}
bamDigger = FileDigger(".bam", ["Flowcell_", "bam", "Project_", "Sample_"])
for (flowcell, nothing, project, sample, bamFile) in bamDigger.getNextFile(self.params.allFlowcellDir) :
if (self.params.sampleNameList != None) and \
(len(self.params.sampleNameList) != 0) and \
(sample not in self.params.sampleNameList) : continue
key = (project, sample)
if key not in bams : bams[key] = []
bams[key].append(bamFile)
mergedBamExt = ".merged.bam"
markDupBamExt = ".markdup.bam"
#
# 2) merge and delete smaller bams:
#
mergedBams = {}