Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def postRequest(url, files=[], fields=[]):
content_type, body = encode_multipart_formdata(fields=fields, files=files)
headers = {'Content-Type': content_type, 'Accept': '*/*'}
url = URL(url)
http = HTTPClient.from_url(url)
response = http.request('POST', url.request_uri, body=body, headers=headers)
return response
def get(path="/sub/channel"):
url = URL("http://%s/%s" % (os.getenv("HTTP_HOST", 'localhost'), path))
http = HTTPClient.from_url(url)
response = http.get(path)
return response
if __name__ == "__main__":
N = 1000
C = 10
url = URL('http://127.0.0.1/index.html')
qs = url.request_uri
def run(client):
response = client.get(qs)
response.read()
assert response.status_code == 200
client = HTTPClient.from_url(url, concurrency=C)
group = gevent.pool.Pool(size=C)
now = time.time()
for _ in xrange(N):
group.spawn(run, client)
group.join()
delta = time.time() - now
req_per_sec = N / delta
print "request count:%d, concurrenry:%d, %f req/s" % (
N, C, req_per_sec)
def swift_auth_v1(self):
self.user = self.user.replace(";", ":")
auth_httpclient = HTTPClient.from_url(
self.auth_url,
connection_timeout=self.http_timeout,
network_timeout=self.http_timeout,
)
headers = {'X-Auth-User': self.user,
'X-Auth-Key': self.password}
path = urlparse.urlparse(self.auth_url).path
ret = auth_httpclient.request('GET', path, headers=headers)
# Should do something with redirections (301 in my case)
if ret.status_code < 200 or ret.status_code >= 300:
raise SwiftException('AUTH v1.0 request failed on ' +
'%s with error code %s (%s)'
% (str(auth_httpclient.get_base_url()) +
def swift_auth_v1(self):
self.user = self.user.replace(";", ":")
auth_httpclient = HTTPClient.from_url(
self.auth_url,
connection_timeout=self.http_timeout,
network_timeout=self.http_timeout,
)
headers = {'X-Auth-User': self.user,
'X-Auth-Key': self.password}
path = urlparse.urlparse(self.auth_url).path
ret = auth_httpclient.request('GET', path, headers=headers)
# Should do something with redirections (301 in my case)
if ret.status_code < 200 or ret.status_code >= 300:
raise SwiftException('AUTH v1.0 request failed on ' +
'%s with error code %s (%s)'
% (str(auth_httpclient.get_base_url()) +
def execute(self, test_cases,concurrency):
default_params = {"method":'GET', "headers":{},"body":None}.items()
for params in test_cases:
p = dict(default_params + params.items())
request = httputil.ParamHTTPRequest(p['url'],p['method'],p['headers'],p['body'])
enable_ssl = request.scheme.lower() == "https"
ssl_options = None
if enable_ssl == True:
ssl_options = {'ssl_version':3}
client = geventhttpclient.HTTPClient(
request.host,
port=request.port,
ssl=enable_ssl,
ssl_options=ssl_options
)
response = client.request(request.method, request.path, body=request.body, headers=request.headers)
client.close()
self.check(
request=request,
response=geventHTTPResponse(response)
)
def _get_client(host, port, **kwargs):
ordered_kwargs = collections.OrderedDict(sorted(kwargs.items()))
cache_key = '{0}:{1}:{2}'.format(
host,
port,
':'.join((
"{0}={1}".format(str(key), str(value))
for key, value in ordered_kwargs.items()
))
)
if cache_key not in _client_cache:
_client_cache[cache_key] = HTTPClient(host, port, **kwargs)
return _client_cache[cache_key]
def __init__(self, host, port, username, password):
self.client = geventhttpclient.HTTPClient(host, port)
self.credentials = base64.b64encode("%s:%s" % (username, password))
self.headers = {
'Content-Type': 'text/json',
'Authorization': 'Basic %s' % self.credentials,
}
def __init__(self, host, port, username, password):
self.client = geventhttpclient.HTTPClient(host, port)
self.credentials = base64.b64encode("%s:%s" % (username, password))
self.headers = {
'Content-Type': 'text/json',
'Authorization': 'Basic %s' % self.credentials,
}
return cache[hash]
res = http.get(
'/avatar/{hash}?d={default}&s={size}'
.format(hash=hash, size=size, default=default)
)
result = hash, res.read() if res.status_code == 200 else None
cache[hash] = result
return result
if not hasattr(fetch_avatars, 'cache'):
fetch_avatars.cache = {}
key = (size, default)
fetch_avatars.cache.setdefault(key, {})
cache = fetch_avatars.cache[key]
http = HTTPClient.from_url('https://www.gravatar.com/')
pool = Pool(20)
res = pool.map(_avatar, hashes)
return [(i[0], base64.b64encode(i[1]) if b64 else i[1]) for i in res if i]