Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_upload_multiple_releases(self):
for filename in ('Whoosh-2.6.0.tar.gz', 'Whoosh-2.6.0.zip'):
upload_file = open(join(TEST_RESOURCES, filename), 'rb')
storage = FieldStorage()
storage.filename = filename
storage.file = upload_file
#Simulate file upload
request = get_current_request()
request.POST = {
"content": storage,
}
view = SimpleView(request)
result = view.upload_release()
self.assertIsInstance(result, Response)
self.assertEqual(result.status_int, 200)
self.assertTrue(exists(join(self.repository, 'Whoosh')))
self.assertTrue(exists(join(self.repository, 'Whoosh', filename)))
def getCurrent(cls, req=None):
from pyramid.threadlocal import get_current_request
from pyramid.security import Everyone
# Very very hackish, but this call is costly and frequent.
# Let's cache it in the request. Useful for view_def use.
if req is None:
req = get_current_request()
assert req
if getattr(req, "lang_prefs", 0) is 0:
user_id = req.authenticated_userid
if user_id and user_id != Everyone:
try:
req.lang_prefs = UserLanguagePreferenceCollection(user_id)
return req.lang_prefs
except Exception:
capture_exception()
# use my locale negotiator
locale = req.locale_name
req.lang_prefs = LanguagePreferenceCollectionWithDefault(locale)
return req.lang_prefs
def is_valid_for_user(self):
request = get_current_request()
provider = request.registry.getAdapter(request, IPASProvider, name=self.pas_name)
return bool(provider.get_profile_image(self.provider_data.get(self.pas_name, {})))
@FormatChecker.cls_checks("accession")
def is_accession_for_server(instance):
# Unfortunately we cannot access the accessionType here
if accession_re.match(instance):
return True
request = get_current_request()
if request.registry[ACCESSION_FACTORY] is test_accession:
if test_accession_re.match(instance):
return True
return False
def task_revoked_signal(request, terminated, signum, expired, **kwaargs):
get_current_request().tm.abort()
if hasattr(celery, "pyramid"):
celery.pyramid["closer"]()
def get_discussion_url(self, request=None, force_v1=False):
"""
from pyramid.request import Request
req = Request.blank('/', base_url=self.discussion.get_base_url())
Celery didn't like this. To revisit once we have virtual hosts
return req.route_url('home', discussion_slug=self.discussion.slug)
Returns the legacy URL route. Currently, /debate/{discussion_slug}
"""
if request is None:
# Shouldn't do this. Method should only be used in context
# of a request!
from pyramid.threadlocal import get_current_request
request = get_current_request()
# TODO: If the route for 'home' is EVER changed, this value MUST be
# synced. KEEP it as 'home' instead of 'new_home', because usage of
# this method is kept mostly for legacy routes that do not exist in
# new front-end yet.
if request is None:
if force_v1 or (not is_using_landing_page(self.discussion)) or \
current_phase_use_v1_interface(
self.discussion.timeline_events):
route = self.get_frontend_url(
'oldDebate', slug=self.discussion.slug)
else:
route = self.get_frontend_url(
'homeBare', slug=self.discussion.slug)
else:
from assembl.views import create_get_route
def annotate_request(request, key, value):
"""Add or update an entry in the request.metrics dict.
This is a helper function for storing data in the request.metrics dict.
It provides some simple conveniences for the calling code:
* If the request is None, then pyramid's threadlocals are used
to find the current request object.
* If the request has no metrics dict then it is silently ignored,
so this is safe to call from contexts that may not metrics-enabled.
* If the key already exists in the metrics dict, it is added to
rather than being overwritten.
"""
if request is None:
request = pyramid.threadlocal.get_current_request()
if request is not None:
try:
if key in request.metrics:
request.metrics[key] += value
else:
request.metrics[key] = value
except AttributeError:
pass
def _Run(self, **values):
try:
localizer = get_localizer(get_current_request())
except:
localizer = FakeLocalizer()
app = self.app
datapool = app.db
conn = datapool.connection
c = conn.cursor()
self.stream.write(u"\n")
sql = "select count(*) from pool_meta"
c.execute(sql)
rec = c.fetchall()
self.stream.write(localizer.translate(_(u"\n", mapping={u"value": rec[0][0]})))
sql = "select count(*) from pool_files"<table><tbody><tr><th>Elements in total</th><td>${value}</td></tr></tbody></table>
def pluralize(self, singular, plural, n, domain=None, mapping=None):
request = get_current_request()
localizer = get_localizer(request)
return localizer.pluralize(singular, plural, n, domain=domain,
mapping=mapping)
def ravenCaptureArguments(self, level=None, **extra):
request = get_current_request()
data = {
'level': level,
'user': {
'id': d.get_userid(),
'ip_address': d.get_address(),
},
'request': {
'url': request.environ['PATH_INFO'],
'method': request.environ['REQUEST_METHOD'],
'data': request.POST,
'query_string': request.environ['QUERY_STRING'],
'headers': http.get_headers(request.environ),
'env': request.environ,
},
}