Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
type="ERROR")
if region_bo.file_type == 'gff':
is_gtf = True
else:
is_gtf = False
if is_gtf:
gtf = GTF(inputfile.name, check_ensembl_format=False)
bed_obj = gtf.select_by_key("feature",
ft_type).get_midpoints(name=names.split(","),
sep=separator)
for line in bed_obj:
write_properly(chomp(str(line)), outputfile)
else:
for line in region_bo:
diff = line.end - line.start
if diff % 2 != 0:
# e.g 10-13 (zero based) -> 11-13 one based
# mipoint is 12 (one-based) -> 11-12 (zero based)
# e.g 949-1100 (zero based) -> 950-1100 one based
# mipoint is 1025 (one-based) -> 1024-1025 (zero based)
# floored division (python 2)...
line.end = line.start + int(diff // 2) + 1
line.start = line.end - 1
else:
# e.g 10-14 (zero based) -> 11-14 one based
downstream=1500,
chrom_info=None,
no_strandness=False,
no_annotation=False):
"""
Find transcript with divergent promoters.
"""
message("Using -u " + str(upstream) + ".")
message("Using -d " + str(downstream) + ".")
tx_with_divergent = dict()
dist_to_divergent = dict()
tss_pos = dict()
message("Loading GTF.")
gtf = GTF(inputfile)
message("Getting transcript coordinates.")
tx_feat = gtf.select_by_key("feature",
"transcript")
message("Getting tss coordinates.")
tss_bo = tx_feat.get_tss(name=["transcript_id", "gene_id"],
sep="||")
# get tss position
for i in tss_bo:
tx_id_tss, gn_id_tss = i.name.split("||")
tss_pos[tx_id_tss] = int(i.start)
for line in sorted(lines, key=operator.itemgetter(3)):
tmp_file.write('\t'.join(line))
tmp_file.close()
tss_bo = BedTool(tmp_file.name)
# ----------------------------------------------------------------------
# Get the list of non redundant TSSs
# ----------------------------------------------------------------------
gene_dict = defaultdict(dict)
to_delete = []
message("Looking for redundant TSS (gene-wise).")
for line in tss_bo:
tss = line.start
name = line.name
gene_id, tx_id = name.split("|")
if gene_id in gene_dict:
if tss not in gene_dict[gene_id]:
gene_dict[gene_id][tss] = tx_id
else:
to_delete += [tx_id]
else:
gene_dict[gene_id][tss] = tx_id
message("Deleted transcripts: " + ",".join(to_delete[1:min(10,
# -------------------------------------------------------------------------
#
# Colors orders
#
# -------------------------------------------------------------------------
if color_order is None:
if group_by == 'bwig':
color_order = ",".join(input_file_bwig)
elif group_by == 'tx_classes':
color_order = ",".join(class_list)
elif group_by == 'chrom':
color_order = ",".join(list(input_file_chrom))
else:
message("color_order is undefined.", type="ERROR")
color_order = color_order.split(",")
else:
color_order = color_order.split(",")
color_order_pb = False
if group_by == 'bwig':
if len(color_order) != len(input_file_bwig):
color_order_pb = True
if len(set(color_order)) != len(set(input_file_bwig)):
color_order_pb = True
for co in color_order:
if co not in input_file_bwig:
color_order_pb = True
elif group_by == 'tx_classes':
Takes a GTF as input to search for genes with alternative promoters.
"""
# -------------------------------------------------------------------------
# Create a list of labels.
# Take user input in account
# -------------------------------------------------------------------------
bed_list = [x.name for x in bed_list]
if len(bed_list) != len(set(bed_list)):
message("Found the same BED file several times.",
type="ERROR")
if len(bed_list) < 2:
message("At least two bed files are needed.",
type="ERROR")
message('Checking labels.')
if labels is not None:
labels = labels.split(",")
# Ensure the number of labels is the same as the number of bed files.
if len(labels) != len(bed_list):
message("The number of labels should be the same as the number of"
" bed files.", type="ERROR")
# Ensure labels are non-redondant
if len(labels) > len(set(labels)):
message("Labels must be unique.", type="ERROR")
else:
labels = []
for i in range(len(bed_list)):
elif file_with_values is None:
if key is None or value is None:
message("Key and value are mandatory. Alternatively use -e/t/g/f or -f with -k.",
type="ERROR")
elif file_with_values is not None:
if key is None:
message("Please set -k.", type="ERROR")
if value is not None:
message("The -f and -v arguments are mutually exclusive.", type="ERROR")
# ----------------------------------------------------------------------
# Load file with value
# ----------------------------------------------------------------------
gtf = GTF(inputfile, check_ensembl_format=False)
all_values = gtf.extract_data(key, as_list=True, no_na=True, nr=True)
if log:
feat_before = len(gtf)
if not file_with_values:
value_list = value.split(",")
gtf = gtf.select_by_key(key, value, invert_match)
else:
value_list = []
for line in file_with_values:
cols = line.split("\t")
value_list += [cols[col - 1]]
file_with_values.close()
file_with_values = open(file_with_values.name)
def del_attr(
inputfile=None,
outputfile=None,
key="transcript_id",
reg_exp=False,
invert_match=False):
"""
Delete extended attributes in the target gtf file. attr_list can be a
comma-separated list of attributes.
"""
gtf = GTF(inputfile, check_ensembl_format=False)
if reg_exp:
try:
rgxp = re.compile(key)
except:
message("Check the regular expression please.", type="ERROR")
key_list = [key]
else:
key_list = key.split(",")
for i in gtf:
feature_keys = i.get_attr_names()
if not invert_match:
for k in key_list:
# -----------------------------------------------------------
if matrix is True:
if new_key is not None:
message("--new-key and --matrix are mutually exclusive.",
type="ERROR")
else:
if new_key is None:
message("--new-key is required when --matrix is False.",
type="ERROR")
# -----------------------------------------------------------
# load the GTF
# -----------------------------------------------------------
gtf = GTF(inputfile, check_ensembl_format=False)
# -----------------------------------------------------------
# Check target feature
# -----------------------------------------------------------
feat_list = gtf.get_feature_list(nr=True)
if target_feature is not None:
target_feature_list = target_feature.split(",")
for i in target_feature_list:
if i not in feat_list + ["*"]:
message("Feature " + i + " not found.",
type="ERROR")
else:
target_feature = ",".join(feat_list)
downstream=1500,
chrom_info=None):
"""
Find transcript with convergent tts.
"""
message("Using -u " + str(upstream) + ".")
message("Using -d " + str(downstream) + ".")
tx_to_convergent_nm = dict()
dist_to_convergent = dict()
tts_pos = dict()
message("Loading GTF.")
gtf = GTF(inputfile)
message("Getting transcript coordinates.")
tx_feat = gtf.select_by_key("feature", "transcript")
message("Getting tts coordinates.")
tts_bo = tx_feat.get_tts(name=["transcript_id", "gene_id"],
sep="||")
# get tts position
for i in tts_bo:
tx_id_ov, gn_id_ov = i.name.split("||")
tts_pos[tx_id_ov] = int(i.start)
message("Getting tts coordinates.")
if not token[0].isdigit():
raise GTFtkError("Column 1 of intput file should be an int.")
new_data = self._dll.add_attr_to_pos(self._data,
native_str(input_file.name),
native_str(new_key))
return self._clone(new_data)
if __name__ == "__main__":
from pygtftk.utils import get_example_file
a = get_example_file()
gtf = GTF(a[0])
for i in gtf["feature", "transcript"]:
i.write(sys.stdout)