Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _get_context(request, stubs=None, pkg_path=None, data_dir=None):
stubs = stubs or []
_frozen = [s.frozen for s in stubs]
_fware = [s.firmware.frozen for s in stubs if s.firmware is not None]
_stub_paths = [s.stubs for s in stubs]
_paths = setutils.IndexedSet([*_frozen, *_fware, *_stub_paths])
_context = {
'base': {},
'stubs': {
'stubs': set(stubs),
'paths': _paths,
'datadir': data_dir,
},
'reqs': {
'paths': setutils.IndexedSet([pkg_path]),
'local_paths': set([])
}
}
if request == 'all':
request = ",".join(list(_context.keys()))
mods = request.split(',')
if 'reqs' in mods and 'stubs' in mods:
def _get_context(request, stubs=None, pkg_path=None, data_dir=None):
stubs = stubs or []
_frozen = [s.frozen for s in stubs]
_fware = [s.firmware.frozen for s in stubs if s.firmware is not None]
_stub_paths = [s.stubs for s in stubs]
_paths = setutils.IndexedSet([*_frozen, *_fware, *_stub_paths])
_context = {
'base': {},
'stubs': {
'stubs': set(stubs),
'paths': _paths,
'datadir': data_dir,
},
'reqs': {
'paths': setutils.IndexedSet([pkg_path]),
'local_paths': set([])
}
}
if request == 'all':
request = ",".join(list(_context.keys()))
mods = request.split(',')
if 'reqs' in mods and 'stubs' in mods:
_ctx = _context['stubs'].copy()
_ctx['paths'].update(_context['reqs']['paths'])
_ctx['local_paths'] = _context['reqs']['local_paths']
return _ctx
context = {}
for m in mods:
context = {**context, **_context.get(m, {})}
return context
return _get_context
def test_indexed_set_basic():
zero2nine = IndexedSet(range(10))
five2nine = zero2nine & IndexedSet(range(5, 15))
x = IndexedSet(five2nine)
x |= set([10])
assert list(zero2nine) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
assert set(zero2nine) == set([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
assert list(five2nine) == [5, 6, 7, 8, 9]
assert x == IndexedSet([5, 6, 7, 8, 9, 10])
assert x[-1] == 10
assert zero2nine ^ five2nine == IndexedSet([0, 1, 2, 3, 4])
assert x[:3] == IndexedSet([5, 6, 7])
assert x[2:4:-1] == IndexedSet([8, 7])
assert len(thou) == 996
while len(thou) > 600:
dead_idx_len = len(thou.dead_indices)
dead_idx_count = thou._dead_index_count
thou.pop(0)
new_dead_idx_len = len(thou.dead_indices)
if new_dead_idx_len < dead_idx_len:
assert dead_idx_count > 0
# 124, 109, 95
assert len(thou) == 600
assert thou._dead_index_count == 67
assert not any([thou[i] is _MISSING for i in range(len(thou))])
thou &= IndexedSet(range(500, 503))
assert thou == IndexedSet([501, 502])
return
# TODO: possible optimization when searching for a
# specific project/metric. search for the project name
# slug and metric name in the part of the line before the
# result begins (the jsonl keys are carefully chosen to
# sort nicely)
for line_data in JSONLIterator(f):
metric_name, proj_slug = line_data['metric_name'], line_data['project']
try:
cur_data = metrics_map[metric_name, proj_slug]
except KeyError:
# not a tracked project/metric
continue
if cur_data is None or cur_data['pull_date'] < line_data['pull_date']:
metrics_map[metric_name, proj_slug] = line_data
possible_paths = IndexedSet()
for (metric_name, proj_slug), data in metrics_map.items():
if data is None:
continue
def _visit(path, key, value):
if not isinstance(value, (list, dict)):
possible_paths.add((metric_name,) + path + (key,))
return True
remap(data['result'], visit=_visit)
# TODO: deal with missing metrics
# TODO: output csv or something
''' --cols 'license.total,evcs.*, sloc.TOTAL_* --cols-file
if col.endswith('*'):
pop the segment with the star, fetch up until that point, then fetch/flatten everything underneath
def get_stub_tree(self, stubs) -> Sequence[Path]:
"""Retrieve and order paths to base stubs and any stubs they depend on.
Args:
stubs: List of Stub Items
Returns:
Paths to all stubs project depends on.
"""
stub_tree = setutils.IndexedSet()
base_stubs = setutils.IndexedSet([s.stubs for s in stubs])
frozen = [s.frozen for s in stubs]
fware_mods = [s.firmware.frozen
for s in stubs if s.firmware is not None]
stub_tree.update(*frozen, *fware_mods, *base_stubs)
return list(stub_tree)
os.mkdir(subsyncit_settings_dir)
make_hidden_on_windows_too(subsyncit_settings_dir)
config.db_dir = subsyncit_settings_dir + os.sep + config.args.absolute_local_root_path.replace("/","%47").replace(":","%58").replace("\\","%92") + "/"
if not os.path.exists(config.db_dir):
os.mkdir(config.db_dir)
db = TinyDB(config.db_dir + os.sep + "subsyncit.db", storage=CachingMiddleware(JSONStorage))
state = State(config.db_dir, MyTinyDBTrace(db.table('files')))
with open(config.db_dir + os.sep + "INFO.TXT", "w") as text_file:
text_file.write(config.args.absolute_local_root_path + "is the Subsyncit path that this pertains to")
local_adds_chgs_deletes_queue = IndexedSet()
class NUllObject(object):
def is_alive(self):
return True
def stop(self):
pass
def join(self):
pass
excluded_filename_patterns = ExcludedPatternNames()
file_system_watcher = NUllObject()
if config.args.do_fs_event_listener:
def dedupe(*input_files):
""" Takes file descriptors and return deduplicated content. """
# Parse and merge all files entries.
results = chain.from_iterable(map(parse_history, input_files))
# Deduplicate entries sharing the same timestamp by removing all previous
# occurences, only keeping the last one. A reverse IndexedSet let us keep
# entries ordered by their encounter. This is important, especially to keep
# together timestamp-less entries coming from the same file.
results = IndexedSet(list(results)[::-1])
results.reverse()
# Sort entries by timestamps.
entries = []
for timestamp, cmd in sorted(results, key=itemgetter(0)):
entries.append("#{}\n{}".format(timestamp, cmd))
return '\n'.join(entries)
def by_external_id(self, external_id, record_types=None):
'''return any resources fetched from the 'by-external-id' route.
Note: while the route will return differently depending on how many records are returned,
this method deliberately flattens that out - it will _always_ return a generator, even if only
one record is found.'''
params = {"eid": external_id}
if record_types: params['type[]'] = record_types
res = self.client.get('by-external-id', params=params)
if res.status_code == 404:
return []
elif res.status_code == 300: # multiple returns, bare list of uris
yield from (wrap_json_object({"ref": uri}, self.client) for uri in IndexedSet(res.json()))
elif res.status_code == 200: # single obj, redirects to obj with 303->200
yield wrap_json_object(res.json(), self.client)
else:
raise ASnakeBadReturnCode("by-external-id call returned '{}'".format(res.status_code))