Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def file_merge(infiles, outfile=None, header=1, verbose=1):
'''merge a list of input files with the same format.
if header will be removed from the 2nd files in the list.
'''
outfile = outfile or '_merged'.join(os.path.splitext(infiles[0]))
out_f, outfile = safewfile(outfile)
if verbose:
print("Merging...")
cnt = 0
for i, fn in enumerate(infiles):
print(os.path.split(fn)[1], '...', end='')
line_no = 0
in_f = anyfile(fn)
if i > 0:
for k in range(header):
in_f.readline()
for line in in_f:
out_f.write(line)
line_no += 1
in_f.close()
cnt += line_no
print(line_no)
def _fetch_data(self, outfile, attributes, filters='', header=None, debug=False):
cnt_all = 0
out_f, outfile = safewfile(outfile, prompt=(not self.no_confirm), default='O')
if header:
out_f.write('\t'.join(header) + '\n')
logging.info('Dumping "%s"...' % os.path.split(outfile)[1])
for species in self.species_li:
dataset = self.get_dataset_name(species)
taxid = species[2]
if not dataset:
continue
xml = self._make_query_xml(dataset, attributes=attributes, filters=filters)
if debug:
logging.info(xml)
try:
con = self.query_mart(xml)
except MartException:
import traceback
err_msg = traceback.format_exc()
def write_mapping_file(mapping_generator, outfile, confirm=True):
"""OUTPUT is mapping file:
-------------------------
Note: you will not know the source of the mapping unless you use
the optional parameter "add_source=True" to merge_mapping() function
col0: Ensembl gene ID
col2 "add_source" == 1: NCBI ID gene ID from gene2ensembl
col2 "add_source" == 2: NCBI ID gene ID from ncbi_list if symbol == ensembl symbol
(i.e. iterate through ncbi list (for each Ensembl ID) on gene_info file
and when the symbol found matches the ensembl symbol use this
NCBI ID if symbols match only once)
"""
print("step 6 start: write file from mapping generator of tuples")
mapping_file, mapping_filename = safewfile(outfile, prompt=confirm,default='O')
count = 0
for item in mapping_generator:
count += 1
split_item = list(item)
split_item = '\t'.join([str(i) for i in split_item])
mapping_file.write(split_item + "\n")
print("total Ensembl IDs uniquely mapped to NCBI gene ID:", count)
mapping_file.close()
print("Output file: \"{}\"".format(mapping_filename))
print("step 6 end\n")
return count
def _fetch_data(self, outfile, attributes, filters='', header=None):
out_f, outfile = safewfile(outfile, prompt=False, default='O')
if header:
out_f.write('\t'.join(header) + '\n')
failed = []
def do(species_li, keep_failed=True):
cnt_lines_all = 0
cnt_species_success = 0
for count, species in enumerate(species_li):
try:
dataset = self.get_dataset_name(species)
except IndexError:
self.logger.debug("Skip species '%s'", species)
continue
if not dataset:
continue
taxid = species[2]
xml = self._make_query_xml(