Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def postRequest(url, files=[], fields=[]):
content_type, body = encode_multipart_formdata(fields=fields, files=files)
headers = {'Content-Type': content_type, 'Accept': '*/*'}
url = URL(url)
http = HTTPClient.from_url(url)
response = http.request('POST', url.request_uri, body=body, headers=headers)
return response
def get_base_url(self):
url = URL()
url.host = self.host
url.port = self.port
url.scheme = self.ssl and PROTO_HTTPS or PROTO_HTTP
return url
}
oauthlib_consumer = oauthlib.Consumer(APP_ID, APP_SECRET)
token = oauthlib.Token(token_info['oauth_token'],
token_info['oauth_token_secret'])
params = {
'oauth_version': "1.0",
'oauth_nonce': oauthlib.generate_nonce(),
'oauth_timestamp': int(time.time()),
'oauth_token': token.key,
'oauth_consumer_key': oauthlib_consumer.key,
'locations': '-122.75,36.8,-121.75,37.8' # San Francisco
}
url = URL('https://stream.twitter.com/1/statuses/filter.json')
req = oauthlib.Request.from_consumer_and_token(
oauthlib_consumer,
token=token,
http_url=str(url),
http_method='POST')
signature_method = oauthlib.SignatureMethod_HMAC_SHA1()
req = oauthlib.Request(method="POST", url=str(url), parameters=params)
req.sign_request(signature_method, oauthlib_consumer, token)
http = HTTPClient.from_url(url)
response = http.request('POST', url.request_uri,
body=req.to_postdata(),
headers={'Content-Type': 'application/x-www-form-urlencoded',
'Accept': '*/*'})
def get_client(self, url):
if not isinstance(url, URL):
url = URL(url)
client_key = url.host, url.port
try:
return self.clients[client_key]
except KeyError:
client = HTTPClient.from_url(url, **self.client_args)
self.clients[client_key] = client
return client
def print_friend_username(http, friend_id):
friend_url = URL('/' + str(friend_id))
friend_url['access_token'] = TOKEN
# the greenlet will block until a connection is available
response = http.get(friend_url.request_uri)
assert response.status_code == 200
friend = json.load(response)
if friend.has_key('username'):
print '%s: %s' % (friend['username'], friend['name'])
else:
print '%s has no username.' % friend['name']
def from_url(cls, url, **kw):
if not isinstance(url, URL):
url = URL(url)
enable_ssl = url.scheme == PROTO_HTTPS
if not enable_ssl:
kw.pop('ssl_options', None)
return cls(url.host, port=url.port, ssl=enable_ssl, **kw)
def get_client(self, url):
if not isinstance(url, URL):
url = URL(url)
client_key = url.host, url.port
try:
return self.clients[client_key]
except KeyError:
client = HTTPClient.from_url(url, **self.client_args)
self.clients[client_key] = client
return client
#!/usr/bin/env python
import gevent.pool
import json
from geventhttpclient import HTTPClient
from geventhttpclient.url import URL
if __name__ == "__main__":
# go to http://developers.facebook.com/tools/explorer and copy the access token
TOKEN = ''
url = URL('https://graph.facebook.com/me/friends')
url['access_token'] = TOKEN
# setting the concurrency to 10 allow to create 10 connections and
# reuse them.
http = HTTPClient.from_url(url, concurrency=10)
response = http.get(url.request_uri)
assert response.status_code == 200
# response comply to the read protocol. It passes the stream to
# the json parser as it's being read.
data = json.load(response)['data']
def print_friend_username(http, friend_id):
friend_url = URL('/' + str(friend_id))
friend_url['access_token'] = TOKEN