Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get(path="/sub/channel"):
url = URL("http://%s/%s" % (os.getenv("HTTP_HOST", 'localhost'), path))
http = HTTPClient.from_url(url)
response = http.get(path)
return response
import time
import gevent.pool
from geventhttpclient import HTTPClient, URL
if __name__ == "__main__":
N = 1000
C = 10
url = URL('http://127.0.0.1/index.html')
qs = url.request_uri
def run(client):
response = client.get(qs)
response.read()
assert response.status_code == 200
client = HTTPClient.from_url(url, concurrency=C)
group = gevent.pool.Pool(size=C)
now = time.time()
for _ in xrange(N):
group.spawn(run, client)
group.join()
#!/usr/bin/env python
from geventhttpclient import HTTPClient, URL
if __name__ == "__main__":
url = URL('http://127.0.0.1:80/100.dat')
http = HTTPClient.from_url(url)
response = http.get(url.request_uri)
assert response.status_code == 200
CHUNK_SIZE = 1024 * 16 # 16KB
with open('/tmp/100.dat', 'w') as f:
data = response.read(CHUNK_SIZE)
while data:
f.write(data)
data = response.read(CHUNK_SIZE)