Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'path': 'search/tweets.json', 'data': search},
}
# Fetch the tweets upfront, asynchronously
futures = [async_fetch(name, **kwargs) for name, kwargs in tests.items()]
done, not_done = concurrent.futures.wait(futures)
response = {future.name: future.result() for future in done}
for key in ('bad-request', 'search-url', 'search-body', 'show', 'timeline', 'get-ok'):
r = response[key]
# Ignore timeouts. These happen quite often
if r.status_code == CLIENT_TIMEOUT:
continue
if key == 'bad-request':
self.assertEqual(r.status_code, BAD_REQUEST)
continue
self.assertEqual(r.status_code, OK)
result = r.json()
if key in ('search-url', 'search-body'):
# Even if we ask for 2 tweets, Twitter may return less
self.assertLessEqual(len(result['statuses']), 2)
elif key == 'show':
self.assertEqual(result['screen_name'].lower(), 'gramener')
elif key == 'timeline':
self.assertEqual(len(result), 2)
self.assertEqual(result[0]['user']['screen_name'].lower(), 'gramener')
self.assertIn(response['get-redirect'].status_code, {OK, CLIENT_TIMEOUT})
self.assertIn(response['post-fail'].status_code, {METHOD_NOT_ALLOWED, CLIENT_TIMEOUT})
def login(self, user, password, url):
r = self.session.get(url)
self.assertEqual(r.status_code, OK)
tree = lxml.html.fromstring(r.text)
# Create form submission data
data = {'user': user, 'password': password}
# TODO: set this as a cookie
data['_xsrf'] = tree.xpath('.//input[@name="_xsrf"]')[0].get('value')
# Submitting the correct password redirects
return self.session.post(url, timeout=10, data=data)
def eq(method, payload, data, where, b):
response = method(base, params=payload, data=data)
self.assertEqual(response.status_code, OK)
a = pd.read_csv(base + where, encoding='utf-8')
if b is None:
self.assertEqual(len(a), 0)
else:
pdt.assert_frame_equal(a, pd.DataFrame(b))
def test_attributes(self):
self.check('/func/attributes', code=OK)
def logout_ok(self, *args, **kwargs):
check_next = kwargs.pop('check_next')
# logout() does not accept user, password. So Just pass the kwargs
r = self.logout(**kwargs)
eq_(r.status_code, OK)
eq_(r.url, server.base_url + check_next)
def test_xsrf_false(self):
# When xsrf_cookies is set to False, POST works
r = requests.post(server.base_url + '/xsrf/no')
eq_(r.status_code, OK)
def eq(url, method, params, where, y):
# call url?params via method.
response = method(url, params=params)
self.assertEqual(response.status_code, OK)
if y is None:
self.assertEqual(len(requests.get(validation_base + where).text.strip()), 0)
else:
x = pd.read_csv(validation_base + where, encoding='utf-8')
pdt.assert_frame_equal(x, pd.DataFrame(y))
def ne(self, r1, r2):
self.assertTrue(r1.status_code == r2.status_code == OK)
self.assertNotEqual(r1.text, r2.text)
self.set_status(response.code)
else:
self.set_status(response.code, custom_responses.get(response.code))
for header, header_value in response.headers.items():
# We're OK with anything that starts with X-
# Also set MIME type and last modified date
if header.startswith('X-') or header in {'Content-Type', 'Last-Modified'}:
self.set_header(header, header_value)
# Set user's headers
for header, header_value in self.kwargs.get('headers', {}).items():
self.set_header(header, header_value)
# Transform content
content = response.body
if content and response.code == OK:
content = yield gramex.service.threadpool.submit(self.run_transforms, content=content)
# Convert to JSON if required
if not isinstance(content, (six.binary_type, six.text_type)):
content = json.dumps(content, ensure_ascii=True, separators=(',', ':'))
raise tornado.gen.Return(content)
if conf is True:
conf = {}
elif conf is False:
return None
# Get the store. Defaults to the first store in the cache: section
default_store = list(info.cache.keys())[0] if len(info.cache) > 0 else None
store_name = conf.get('store', default_store)
if store_name not in info.cache:
app_log.warning('url: %s: store %s missing', name, store_name)
store = info.cache.get(store_name)
url_cache_key = _get_cache_key(conf, name)
cachefile_class = urlcache.get_cachefile(store)
cache_expiry = conf.get('expiry', {})
cache_statuses = conf.get('status', [OK, NOT_MODIFIED])
cache_expiry_duration = cache_expiry.get('duration', MAXTTL)
# This method will be added to the handler class as "cache", and called as
# self.cache()
def get_cachefile(handler):
return cachefile_class(key=url_cache_key(handler), store=store,
handler=handler, expire=cache_expiry_duration,
statuses=set(cache_statuses))
return get_cachefile