Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
return _log_request(treq.patch, url, data=data, **kwargs)
def delete(url, **kwargs):
"""
Wrapper around :meth:`treq.delete` that logs the request.
See :py:func:`treq.delete`
"""
return _log_request(treq.delete, url, **kwargs)
json_content = treq.json_content
content = treq.content
text_content = treq.text_content
__version__ = treq.__version__
def upload_data(self, src, dst):
proxy_id = dst['proxy_id']
proxy_addr = yield get_proxy(proxy_id)
if proxy_addr:
# proxy addr format: (ip, ctrl_port, file_port, stream_ws_port, stream_restful_port)
url = 'https://%s:%d' % (str(proxy_addr[0]), int(proxy_addr[2]))
data = open(src, 'rb')
resp = yield treq.post(url, agent=no_verify_agent(), data=data, stream=True)
if resp:
file_path = yield treq.text_content(resp)
file_url = '%s/%s' % (url, file_path)
return file_url
def upload_data(self, src, dst, port=None):
proxy_id = dst['proxy_id']
proxy_addr = yield get_proxy(proxy_id, port)
if proxy_addr:
# proxy addr format: (ip, ctrl_port, file_port, stream_ws_port, stream_restful_port)
url = 'https://%s:%d' % (str(proxy_addr[0]), int(proxy_addr[2]))
data = open(src, 'rb')
resp = yield treq.post(url, agent=no_verify_agent(), data=data, stream=True)
if resp:
file_path = yield treq.text_content(resp)
file_url = '%s/%s' % (url, file_path)
return file_url