Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def load_data(header):
print(colored('Found %d data lines' % len(header), 'cyan'))
line_groups = group_by(r'^!([^_]+)_', header)
# Load series
series_df = get_df_from_lines(line_groups['Series'], 'Series')
assert len(series_df.index) == 1
missing = REQUIRED_SERIES_FIELDS - set(series_df.columns)
if missing:
cprint('Skip incomplete header: %s column%s missing'
% (', '.join(sorted(missing)), 's' if len(missing) > 1 else ''), 'red')
return
gse_name = series_df['series_geo_accession'][0]
# Skip multispcecies
if '|\n|' in series_df['series_platform_taxid'][0]:
cprint('Skip multispecies', 'red')
return
def group(events):
return group_by(lambda x: x['repo'], events)
def save_annotation(user_id, data, from_api=False):
# Group samples by platform
sample_to_platform = dict(Sample.objects.filter(id__in=data['values'])
.values_list('id', 'platform_id'))
groups = group_by(lambda (id, _): sample_to_platform[id], data['values'].items())
# Save all annotations and used regexes
for platform_id, annotations in groups.items():
# Do not allow for same user to annotate same serie twice
series_tag, _ = SeriesTag.objects.get_or_create(
series=data['series'], platform_id=platform_id, tag=data['tag'], created_by_id=user_id,
defaults=dict(header=data['column'], regex=data['regex'],
modified_by_id=user_id, from_api=from_api,
comment=data['comment'])
)
# TODO: check if this can result in sample tags doubling
# Creat e all sample tags
sample_tags = SampleTag.objects.bulk_create([
SampleTag(sample_id=sample_id, series_tag=series_tag, annotation=annotation,
created_by_id=user_id, modified_by_id=user_id)
for sample_id, annotation in annotations
def group_needles(line_needles):
"""Group line needles by line. [(_, line)] -> [[_]]."""
grouped_needles = sorted(group_by(itemgetter(1), line_needles).iteritems(),
key=itemgetter(0))
return [map(itemgetter(0), ndl) for ndl in pluck(1, grouped_needles)]
def iterable_per_line(triples):
"""Yield iterables of (key, value mapping), one for each line."""
# Jam all the triples of a file into a hash by line number:
line_map = group_by(lambda (k, v, extent): extent.start.row, triples) # {line: triples}
last_line = max(line_map.iterkeys()) + 1 if line_map else 1
# Pull out the needles for each line, stripping off the extents and
# producing a blank list for missing lines. (The defaultdict returned from
# group_by takes care of the latter.)
return [[(k, v) for (k, v, e) in line_map[line_num]]
for line_num in xrange(1, last_line)]
graph.vertex_properties['actors_on_vertices'][vertex] + \
' ' + str(graph.vertex_properties['pagerank'][vertex])
graph.vertex_properties['pos'] = sfdp_layout(
graph, eweight=graph.edge_properties['weights_on_edges'])
dir_name = 'pagerank/' + \
graph.graph_properties['repo_on_graph'].replace('/', '%') + '/'
os.mkdir(dir_name)
def event_bulk(vertex):
event = graph.vertex_properties['events_on_vertices'][vertex]
return event['created_at'].strftime("%Y-%m-%d %H")
batch_sizes = map(lambda x: len(x[1]), sorted(group_by(
event_bulk, graph.vertices()).items(), key=lambda x: x[0]))
def tail_number(n):
if n == 0:
return batch_sizes[0]
else:
return tail_number(n - 1) + batch_sizes[n]
batch_numbers = map(tail_number, range(len(batch_sizes)))
map(draw_graph_frame, map(
lambda x: (graph, dir_name, x), batch_numbers))
images = [Image.open(dir_name + str(i) + '.png') for i in batch_numbers]
writeGif(dir_name + 'animation.gif', images, duration=0.1)