Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def genome(request, tempdir):
"""Create a test genome."""
name = "dm3" # Use fake name for blacklist test
fafile = "tests/data/small_genome.fa"
bgzipped = True if request.param == "bgzipped" else False
# Input needs to be bgzipped, depending on param
if os.path.exists(fafile + ".gz"):
if not bgzipped:
check_call(["gunzip", fafile + ".gz"])
elif bgzipped:
check_call(["bgzip", fafile])
tmpdir = os.path.join(tempdir, request.param, name)
mkdir_p(tmpdir)
if bgzipped:
fafile += ".gz"
copyfile(fafile, os.path.join(tmpdir, os.path.basename(fafile)))
for p in init_plugins():
activate(p)
# provide the fixture value
yield Genome(name, genome_dir=os.path.join(tempdir, request.param))
if os.path.exists(fafile) and not bgzipped:
check_call(["bgzip", fafile])
def after_genome_download(self, genome, threads=1, force=False):
if not cmd_ok("hisat2-build"):
return
# Create index dir
index_dir = genome.props["hisat2"]["index_dir"]
index_name = genome.props["hisat2"]["index_name"]
if force:
# Start from scratch
rmtree(index_dir, ignore_errors=True)
mkdir_p(index_dir)
if not any(fname.endswith(".ht2") for fname in os.listdir(index_dir)):
# unzip genome if zipped and return up-to-date genome name
bgzip, fname = bgunzip_and_name(genome)
# Create index
cmd = "hisat2-build -p {} {} {}".format(threads, fname, index_name)
run_index_cmd("hisat2", cmd)
# re-zip genome if unzipped
bgrezip(bgzip, fname)
def after_genome_download(self, genome, threads=1, force=False):
if not cmd_ok("STAR"):
return
# Create index dir
index_dir = genome.props["star"]["index_dir"]
index_name = genome.props["star"]["index_name"]
if force:
# Start from scratch
rmtree(index_dir, ignore_errors=True)
mkdir_p(index_dir)
if not os.path.exists(index_name):
# unzip genome if zipped and return up-to-date genome name
bgzip, fname = bgunzip_and_name(genome)
# Create index
cmd = "STAR --runMode genomeGenerate --runThreadN {} --genomeFastaFiles {} --genomeDir {} --outFileNamePrefix {}".format(
threads, fname, index_dir, index_dir
)
run_index_cmd("star", cmd)
# re-zip genome if it was unzipped prior
bgrezip(bgzip, fname)
def download_and_generate_annotation(genomes_dir, annot_url, localname):
"""download annotation file, convert to intermediate file and generate output files"""
# create output directory if missing
out_dir = os.path.join(genomes_dir, localname)
if not os.path.exists(out_dir):
mkdir_p(out_dir)
# download to tmp dir. Move files on completion.
with TemporaryDirectory(dir=out_dir) as tmpdir:
ext, gz = get_file_info(annot_url)
annot_file = os.path.join(tmpdir, localname + ".annotation" + ext)
urlretrieve(annot_url, annot_file)
# unzip input file (if needed)
if gz:
cmd = "mv {0} {1} && gunzip -f {1}"
sp.check_call(cmd.format(annot_file, annot_file + ".gz"), shell=True)
# generate intermediate file (GenePred)
pred_file = annot_file.replace(ext, ".gp")
if "bed" in ext:
cmd = "bedToGenePred {0} {1}"
bgzip : bool , optional
If set to True the genome FASTA file will be compressed using bgzip.
If not specified, the setting from the configuration file will be used.
"""
self.check_name(name)
link = self.get_genome_download_link(name, mask=mask, **kwargs)
original_name = name
name = safe(name)
localname = get_localname(name, localname)
genomes_dir = os.path.expanduser(genomes_dir)
out_dir = os.path.join(genomes_dir, localname)
if not os.path.exists(out_dir):
mkdir_p(out_dir)
sys.stderr.write(f"Downloading genome from {link}...\n")
# download to tmp dir. Move genome on completion.
# tmp dir is in genome_dir to prevent moving the genome between disks
with TemporaryDirectory(dir=out_dir) as tmp_dir:
fname = os.path.join(tmp_dir, f"{localname}.fa")
# actual download
urlcleanup()
with urlopen(link) as response:
# check available memory vs file size.
available_memory = int(virtual_memory().available)
file_size = int(response.info()["Content-Length"])
# download file in chunks if >75% of memory would be used
cutoff = int(available_memory * 0.75)
def after_genome_download(self, genome, threads=1, force=False):
if not cmd_ok("bowtie2-build"):
return
# Create index dir
index_dir = genome.props["bowtie2"]["index_dir"]
index_name = genome.props["bowtie2"]["index_name"]
if force:
# Start from scratch
rmtree(index_dir, ignore_errors=True)
mkdir_p(index_dir)
if not any(fname.endswith(".bt2") for fname in os.listdir(index_dir)):
# Create index
cmd = "bowtie2-build --threads {} {} {}".format(
threads, genome.filename, index_name
)
run_index_cmd("bowtie2", cmd)
def manage_config(cmd):
"""Manage genomepy config file."""
if cmd == "file":
print(config.config_file)
elif cmd == "show":
with open(config.config_file) as f:
print(f.read())
elif cmd == "generate":
config_dir = user_config_dir("genomepy")
if not os.path.exists(config_dir):
mkdir_p(config_dir)
new_config = os.path.join(config_dir, "genomepy.yaml")
# existing config must be removed before norns picks up the default again
if os.path.exists(new_config):
os.unlink(new_config)
default_config = norns.config(
"genomepy", default="cfg/default.yaml"
).config_file
with open(new_config, "w") as fout, open(default_config) as fin:
fout.write(fin.read())
config.config_file = new_config
print(f"Created config file {new_config}")
else:
raise ValueError(f"Invalid config command: {cmd}")
def after_genome_download(self, genome, threads=1, force=False):
if not cmd_ok("bwa"):
return
# Create index dir
index_dir = genome.props["bwa"]["index_dir"]
index_name = genome.props["bwa"]["index_name"]
if force:
# Start from scratch
rmtree(index_dir, ignore_errors=True)
mkdir_p(index_dir)
if not any(fname.endswith(".bwt") for fname in os.listdir(index_dir)):
# Create index
if not os.path.exists(index_name):
os.symlink(genome.filename, index_name)
cmd = "bwa index {}".format(index_name)
run_index_cmd("bwa", cmd)