Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def parse(self, infile):
print(os.path.split(infile)[1])
cnt = 0
err_d = {}
_f = anyfile(infile)
ff = rec_handler(_f)
for rec in ff:
if not rec.startswith('rs'):
continue
doc = self.parse_one_record(rec)
if isinstance(doc, dict):
cnt += 1
yield doc
else:
if doc in err_d:
err_d[doc] += 1
else:
err_d[doc] = 1
print(cnt, err_d)
def file_merge(infiles, outfile=None, header=1, verbose=1):
'''merge a list of input files with the same format.
if header will be removed from the 2nd files in the list.
'''
outfile = outfile or '_merged'.join(os.path.splitext(infiles[0]))
out_f, outfile = safewfile(outfile)
if verbose:
print("Merging...")
cnt = 0
for i, fn in enumerate(infiles):
print(os.path.split(fn)[1], '...', end='')
line_no = 0
in_f = anyfile(fn)
if i > 0:
for k in range(header):
in_f.readline()
for line in in_f:
out_f.write(line)
line_no += 1
in_f.close()
cnt += line_no
print(line_no)
out_f.close()
print("=" * 20)
print("Done![total %d lines output]" % cnt)
def test(self, infile):
_f = anyfile(infile)
ff = rec_handler(_f)
gd = []
err_cnt = 0
for rec in ff:
if not rec.startswith('rs'):
continue
lines = rec.strip().split('\n')
self._parse_rsline(lines)
d = self._parse_GMAF(lines)
if not d:
err_cnt += 1
gd.append(d)
print(err_cnt)
return gd
def tabfile_feeder(datafile, header=1, sep='\t',
includefn=None,
coerce_unicode=True,
assert_column_no=None):
'''a generator for each row in the file.'''
in_f = anyfile(datafile)
reader = csv.reader(in_f, delimiter=sep)
lineno = 0
try:
for i in range(header):
next(reader)
lineno += 1
for ld in reader:
if assert_column_no:
if len(ld) != assert_column_no:
err = "Unexpected column number:" \
" got {}, should be {}".format(len(ld), assert_column_no)
raise ValueError(err)
if not includefn or includefn(ld):
lineno += 1
if coerce_unicode:
def tabfile_tester(datafile, header=1, sep='\t'):
reader = csv.reader(anyfile(datafile), delimiter=sep)
lineno = 0
try:
for i in range(header):
next(reader)
lineno += 1
for ld in reader:
lineno += 1
except:
print("Error at line number:", lineno)
raise
def __init__(self, infile):
self.infile = infile
self.in_f = anyfile(self.infile)
def data_generator(input_file, version, include_gnomad):
open_file = anyfile(input_file)
db_nsfp = csv.reader(open_file, delimiter="\t")
index = next(db_nsfp)
assert len(index) == VALID_COLUMN_NO, "Expecting %s columns, but got %s" % (VALID_COLUMN_NO, len(index))
previous_row = None
for row in db_nsfp:
df = dict(zip(index, row))
# use transpose matrix to have 1 row with N 187 columns
current_row = _map_line_to_json(df, version=version, include_gnomad=include_gnomad)
if previous_row and current_row:
if current_row["_id"] == previous_row["_id"]:
aa = previous_row["dbnsfp"]["aa"]
if not isinstance(aa, list):
aa = [aa]
aa.append(current_row["dbnsfp"]["aa"])
previous_row["dbnsfp"]["aa"] = aa
if len(previous_row["dbnsfp"]["aa"]) > 1: