Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
response.status_code.__eq__.side_effect = lambda x: x == 500
response.headers.__contains__.reset_mock()
response.headers.__contains__.side_effect = lambda x: x == 'content-type'
response.headers.get.side_effect = lambda x, default_value: "application/json" if x == 'content-type' else None
response.json.return_value = {"reason": retryErrorMessages[0]}
_with_retry(function, **retryParams)
assert_true(response.headers.get.called)
assert_equals(function.call_count, 1 + 4 + 3 + 4)
# -- Propagate an error up --
print("Expect a SynapseError: Bar")
def foo():
raise SynapseError("Bar")
function.side_effect = foo
assert_raises(SynapseError, _with_retry, function, **retryParams)
assert_equals(function.call_count, 1 + 4 + 3 + 4 + 1)
def restDELETE(self, uri, endpoint=None, headers=None, retryPolicy={}, **kwargs):
"""
Performs a REST DELETE operation to the Synapse server.
:param uri: URI of resource to be deleted
:param endpoint: Server endpoint, defaults to self.repoEndpoint
:param headers: Dictionary of headers to use rather than the API-key-signed default set of headers
:param kwargs: Any other arguments taken by a `requests `_ method
"""
uri, headers = self._build_uri_and_headers(uri, endpoint, headers)
retryPolicy = self._build_retry_policy(retryPolicy)
response = _with_retry(lambda: requests.delete(uri, headers=headers, **kwargs), **retryPolicy)
exceptions._raise_for_status(response, verbose=self.debug)
# Get token
md5 = hashlib.md5(content.encode("utf-8")).hexdigest()
token = self._createChunkedUploadToken(md5, "message", contentType)
retry_policy=self._build_retry_policy(
{"retry_errors":['We encountered an internal error. Please try again.']})
i = 1
chunk_record = {'chunk-number':i}
# Get the signed S3 URL
url = self._createChunkedFileUploadChunkURL(i, token)
# PUT the chunk to S3
response = _with_retry(
lambda: requests.put(url, data=content.encode("utf-8"), headers=headers),
**retry_policy)
chunk_record['response-status-code'] = response.status_code
chunk_record['response-headers'] = response.headers
if response.text:
chunk_record['response-body'] = response.text
# Is requests closing response stream? Let's make sure:
# "Note that connections are only released back to
# the pool for reuse once all body data has been
# read; be sure to either set stream to False or
# read the content property of the Response object."
# see: http://docs.python-requests.org/en/latest/user/advanced/#keep-alive
try:
if response:
with open(filepath, 'rb') as f:
for chunk in utils.chunks(f, chunksize):
i += 1
chunk_record = {'chunk-number':i}
def put_chunk():
# Get the signed S3 URL
url = self._createChunkedFileUploadChunkURL(i, token)
chunk_record['url'] = url
if progress:
sys.stdout.write('.')
sys.stdout.flush()
return requests.put(url, data=chunk, headers=headers)
# PUT the chunk to S3
response = _with_retry(put_chunk, **retry_policy)
if progress:
sys.stdout.write(',')
sys.stdout.flush()
chunk_record['response-status-code'] = response.status_code
chunk_record['response-headers'] = response.headers
if response.text:
chunk_record['response-body'] = response.text
diagnostics['chunks'].append(chunk_record)
# Is requests closing response stream? Let's make sure:
# "Note that connections are only released back to
# the pool for reuse once all body data has been
# read; be sure to either set stream to False or
# read the content property of the Response object."
"""
Performs a REST PUT operation to the Synapse server.
:param uri: URI on which get is performed
:param endpoint: Server endpoint, defaults to self.repoEndpoint
:param body: The payload to be delivered
:param headers: Dictionary of headers to use rather than the API-key-signed default set of headers
:param kwargs: Any other arguments taken by a `requests `_ method
:returns: JSON encoding of response
"""
uri, headers = self._build_uri_and_headers(uri, endpoint, headers)
retryPolicy = self._build_retry_policy(retryPolicy)
response = _with_retry(lambda: requests.put(uri, data=body, headers=headers, **kwargs), **retryPolicy)
exceptions._raise_for_status(response, verbose=self.debug)
return self._return_rest_body(response)