Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def project_dir():
"""Creates a mock user directory in which fnet commands would be used.
Copies over example tifs to be used as test data and a dummy module
containing dataset definitions.
"""
path_pre = Path.cwd()
path_tmp = Path(tempfile.mkdtemp())
path_test_dir = Path(__file__).parent
path_data_dir = path_test_dir.parent / 'data'
Path.mkdir(path_tmp / 'data')
for tif in ['EM_low.tif', 'MBP_low.tif']:
shutil.copy(path_data_dir / tif, path_tmp / 'data')
shutil.copy(path_test_dir / 'data' / 'dummymodule.py', path_tmp)
os.chdir(path_tmp)
yield path_tmp
os.chdir(path_pre)
def save(self, image, output_dir, output_name):
"""
Save the output to a specific dir.
Save a 3D Numpy array (H, W, 3) as an image.
Args:
image (numpy.ndarray): Image.
output_dir (str): Output directory path
output_name (str): Output name
"""
Path.mkdir(Path(output_dir), parents=True, exist_ok=True)
cv2.imwrite(
str(Path(output_dir) / output_name),
cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
)
# stream handler
chdr = logging.StreamHandler()
chdr.setLevel(logging.DEBUG)
chdr.setFormatter(formatter)
logger.addHandler(chdr)
log_dir = kwargs.pop("log_dir", "./logs")
rank = kwargs.pop("rank", None)
# file handler
if "log_file" in kwargs:
log_file = kwargs.pop("log_file")
log_path = Path(log_dir, log_file).resolve()
if rank is not None:
log_path = log_path.with_suffix(f".{rank}{log_path.suffix}")
Path.mkdir(log_path.parent, parents=True, exist_ok=True)
fhdr = logging.FileHandler(log_path)
fhdr.setLevel(logging.DEBUG)
fhdr.setFormatter(formatter)
logger.addHandler(fhdr)
logger.info(f"begins logging to file: {str(log_path)}")
if "slack" in kwargs and kwargs["slack"]:
try:
env = str(Path(log_dir).name)
if rank is not None:
env += f":rank{rank}"
shdr = SlackClientHandler(env=env)
shdr.setLevel(logging.INFO)
shdr.setFormatter(formatter)
logger.addHandler(shdr)
# Reconfigure the gene_list to reflect the existing accession info
self.current_gene_list = gene_list
else:
self.current_gene_list = self.gene_list
# Get GI (stdout) and query sequence (FASTA format)
self.blastn_log.info("Creating directories.")
self.blastn_log.info("Extracting query refseq sequence to a temp.fasta file from BLAST database.")
# Iterate the query accessions numbers
for query in query_accessions:
gene = self.acc_dict[query][0][0]
gene_path = self.raw_data / Path(gene) / Path('BLAST')
# Create the directories for each gene
try:
Path.mkdir(gene_path, exist_ok=True, parents=True)
self.blastn_log.info("Directory created: %s" % gene)
except FileExistsError:
self.blastn_log.info("Directory exists: %s" % gene)
# Save sequence data in FASTA file format and print the gi number
# to stdout with a custom BLAST extraction
# https://www.ncbi.nlm.nih.gov/books/NBK279689/#_cookbook_Custom_data_extraction_and_form_
# TODO-SDH Combine these BLAST extractions???
fmt = {'query': query, 'temp fasta': str(gene_path / Path('temp.fasta'))}
# Create a temporary fasta file using blastdbcmd
blastdbcmd_query = "blastdbcmd -entry {query} -db refseq_rna -outfmt %f -out {temp fasta}".format(**fmt)
try:
blastdbcmd_status = run([blastdbcmd_query], stdout=PIPE,
stderr=PIPE,shell=True, check=True)
except CalledProcessError as err:
# -------------------------------------------
# 2 read config file
with open(config_file, 'r') as file:
config_dict = json.load(file)
processing_dir = Path(config_dict['processing_dir'])
ard = config_dict['processing']['single_ARD']
ard_mt = config_dict['processing']['time-series_ARD']
# -------------------------------------------
# 3 get namespace of directories and check if already processed
# get the burst directory
burst_dir = processing_dir.joinpath(burst)
# get timeseries directory and create if non existent
out_dir = burst_dir.joinpath('Timeseries')
Path.mkdir(out_dir, parents=True, exist_ok=True)
# in case some processing has been done before, check if already processed
check_file = out_dir.joinpath(f'.{product}.{pol}.processed')
if Path.exists(check_file):
logger.info(
f'Timeseries of {burst} for {product} in {pol} '
f'polarisation already processed.'
)
out_files = 'already_processed'
out_vrt = 'already_processed'
return (
burst, list_of_files, out_files, out_vrt, f'{product}.{pol}', None
)
cfg = ConfigParser(allow_no_value=True)
root = Path(CONFIG["GLOBALS"]["ROOTDIR"]).expanduser()
str_lib_path = root / CONFIG["GLOBALS"]["DEFAULT_STR_LIB_NAME"]
CONFIG["GLOBALS"]["DEFAULT_STR_LIB"] = str(str_lib_path)
if not root.exists():
Path.mkdir(root)
if (root / CONFIG["GLOBALS"]["CONFIGFILE"]).exists():
cfg.read(str(root / CONFIG["GLOBALS"]["CONFIGFILE"]))
else:
print_dizzy("config/init: no config file found, creating default config.")
cfg['dizzy'] = {'module_path' : '%s/modules' % CONFIG["GLOBALS"]["ROOTDIR"],
'overwrite_path' : '%s/local' % CONFIG["GLOBALS"]["ROOTDIR"]}
with (root / CONFIG["GLOBALS"]["CONFIGFILE"]).open('x') as cfile:
cfg.write(cfile)
Path.mkdir(root / "modules", exist_ok=True)
Path.mkdir(root / "local", exist_ok=True)
if not str_lib_path.exists():
if str_lib_path.is_symlink():
str_lib_path.unlink()
dflt_str_lib_path = Path(prefix + CONFIG["GLOBALS"]["DEFAULT_STR_LIB_NAME"])
if dflt_str_lib_path.exists():
print_dizzy("config/init: creating symlink to std_lib.")
str_lib_path.symlink_to(dflt_str_lib_path.resolve())
dflt_str_lib_path = Path("./lib/" + CONFIG["GLOBALS"]["DEFAULT_STR_LIB_NAME"])
if dflt_str_lib_path.exists():
print_dizzy("config/init: creating symlink to std_lib.")
str_lib_path.symlink_to(dflt_str_lib_path.resolve())
CONFIG.update(dict(cfg))
modp = CONFIG['dizzy'].get("module_path")
if modp is None:
def main() -> None:
parser: ArgumentParser = ArgumentParser(description="Converts Praat TextGrid format to ELAN eaf Format.")
parser.add_argument("--tg", "--textgrid", type=str, help="The input TextGrid format file", required=True)
parser.add_argument("--wav", type=str, help="The relative path to the .wav file associated with the TextGrid",
required=True)
parser.add_argument("-o", "--outfile", type=str, help="The file path for the ELAN file output",
default="./inferred-aligned.eaf")
arguments = parser.parse_args()
textgrid_file = arguments.tg
wav_file = Path(get_first_wav(arguments.wav))
output_file = Path(arguments.outfile)
if not output_file.parent:
Path.mkdir(output_file.parent, parents=True)
textgrid = Praat.TextGrid(file_path=textgrid_file)
elan = textgrid.to_eaf()
elan.add_linked_file(file_path=str(wav_file.absolute()),
relpath=str(wav_file),
mimetype=Elan.Eaf.MIMES.get("wav", ""),
time_origin=0)
elan.to_file(output_file)
iterFlag = True
iteration = 0
while iterFlag is True:
set_iter = kwargs['iterations']
iteration += 1
# Create paths for output files
if columnFilter is not None:
outDir = self.raw_data / Path(gene) / Path(outDir + '_sf_cf')
elif maskFilter is not None:
outDir = self.raw_data / Path(gene) / Path(outDir + '_sf_mf')
else:
outDir = self.raw_data / Path(gene) / Path(outDir + '_sf')
Path.mkdir(outDir, parents=True, exist_ok=True)
iterDir = Path(outDir) / Path('iter_%s' % iteration)
g2_rem_file = str(iterDir / Path('Seqs.Orig.fas.FIXED.Removed_Seq.With_Names')) # Need for all iterations
Path.mkdir(iterDir, parents=True, exist_ok=True)
# Create files for masking
if maskFilter is not None:
g2_aln2mask = str(iterDir / Path('%s.%s.aln.With_Names' % (dataset, msaProgram)))
g2_rprScores = str(iterDir / Path('%s.%s.Guidance2_res_pair_res.scr' % (dataset, msaProgram)))
if iteration == 1:
# seqFile is the given input
G2Cmd = Guidance2Commandline(seqFile=seqFile, msaProgram=msaProgram, seqType=seqType,
outDir=str(iterDir), **kwargs)
print(G2Cmd)
subprocess.check_call([str(G2Cmd)], stderr=subprocess.STDOUT, shell=True)
# Copy the Guidance removed seq file and paste it to the home directory
# Creates the rem_file
# Files without any removed don't have the file *.With_Names
cls.windowmaker_files = cls.ncbi_db_repo / Path('blast') / Path('windowmaker_files')
cls.ncbi_taxonomy = cls.ncbi_db_repo / Path('pub') / Path('taxonomy')
cls.ncbi_refseq_release = cls.ncbi_db_repo / Path('refseq') / Path('release')
# Use the basic_project cookie to create the directory structure
if new or (not Path(cls.project_path).is_dir()):
Kitchen = Oven(project=project, basic_project=True)
Kitchen.bake_the_project(cookie_jar=project_path)
# Use the custom dictionary to set the path variables
# and to make the directories if necessary. This overrides
if custom:
for key, value in custom.items():
setattr(cls, key, value)
if not Path(str(value)).is_dir():
Path.mkdir(value, exist_ok=True)
return cls
__author__ = '__apple'
__time__ = '2018/1/17 15:47'
from werkzeug.exceptions import HTTPException
from app import create_app
from app.help.error import APIException
from app.help.error_code import ServerError
from pathlib import Path
# 如果没有 image 目录就创建
base_dir = Path(__file__).parent
image_dir = Path.joinpath(base_dir, 'image')
if not Path.exists(image_dir):
Path.mkdir(image_dir)
app = create_app()
# 兼容 windows
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# AOP Flask 1.0
@app.errorhandler(Exception)
def framework_error(e):
if isinstance(e, APIException):
return e
if isinstance(e, HTTPException):
code = e.code
msg = e.description
error_code = 1005
return APIException(msg, code, error_code)