Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@dogstatsd.timed('xmlrpc.function', tags=['function:release_data'])
@metric
@cache_by_pkg
def release_data(store, package_name, version):
info = store.get_package(package_name, version)
if not info:
return {}
info = info.as_dict()
if "description_html" in info:
del info['description_html']
dependencies = defaultdict(list)
for kind, specifier in store.get_release_dependencies(package_name, version):
dependencies[dependency.by_val[kind]].append(specifier)
info.update(dependencies)
classifiers = [r[0] for r in store.get_release_classifiers(package_name,
version)]
info['classifiers' ] = classifiers
info['package_url'] = 'http://pypi.python.org/pypi/%s' % package_name
@dogstatsd.timed('xmlrpc.function', tags=['function:changelog_last_serial'])
@metric
def changelog_last_serial(store):
"return the last changelog event's serial"
return store.changelog_last_serial()
@dogstatsd.timed('xmlrpc.function', tags=['function:user_packages'])
@metric
def user_packages(store, user):
'''Return associated packages for user.'''
result = store.get_user_packages(user)
return [tuple(fields.values()) for fields in result]
@dogstatsd.timed('xmlrpc.function', tags=['function:changelog_since_serial'])
@metric
def changelog_since_serial(store, since_serial):
'return the changes since the nominated event serial (id)'
result = []
for row in store.changelog_since_serial(since_serial, full=False):
if isinstance(row['submitted_date'], str):
d = datetime.datetime.strptime(row['submitted_date'],
'%Y-%m-%d %H:%M:%S').timetuple()
else:
d = row['submitted_date'].timetuple()
result.append((row['name'],row['version'], int(time.mktime(d)),
row['action'], row['id']))
return result
@dogstatsd.timed('xmlrpc.function', tags=['function:changelog'])
@metric
def changelog(store, since, with_ids=False):
result = []
for row in store.changelog(since, full=False):
if isinstance(row['submitted_date'], str):
d = datetime.datetime.strptime(row['submitted_date'],
'%Y-%m-%d %H:%M:%S').timetuple()
else:
d = row['submitted_date'].timetuple()
t = (row['name'],row['version'], int(time.mktime(d)), row['action'])
if with_ids:
t += (row['id'], )
result.append(t)
return result
@dogstatsd.timed('xmlrpc.function', tags=['function:package_hosting_mode'])
@metric
def package_hosting_mode(store, package_name):
"""Returns the hosting mode for a given package."""
return store.get_package_hosting_mode(package_name)
@dogstatsd.timed('xmlrpc.function', tags=['function:top_packages'])
@metric
def top_packages(store, num=None):
return store.top_packages(num=num)
@dogstatsd.timed('xmlrpc.function', tags=['function:package_roles'])
@metric
@cache_by_pkg
def package_roles(store, package_name):
'''Return associated users and package roles.'''
result = store.get_package_roles(package_name)
return [tuple(fields.values())for fields in result]
@dogstatsd.timed('xmlrpc.function', tags=['function:changed_packages'])
@metric
def changed_packages(store, since):
return store.changed_packages(since)
@dogstatsd.timed('xmlrpc.function', tags=['function:search'])
@metric
def search(store, spec, operator='and'):
spec['_pypi_hidden'] = 'FALSE'
return [row.as_dict() for row in store.search_packages(spec, operator)]