Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def is_pinned(client, path):
error_msg = None
try:
resp = client.pin.ls(path)
assert path.split("/")[-1] in resp["Keys"]
except ipfshttpclient.exceptions.ErrorResponse as exc:
error_msg = exc.args[0]
if "not pinned" in error_msg:
return False
raise
return True
def test_http_client_failure(http_client, http_server):
"""Tests that an http client failure raises an ipfsHTTPClientError."""
http_server.serve_content(json.dumps({
"Message": "Someone set us up the bomb"
}), 500)
with pytest.raises(ipfshttpclient.exceptions.ErrorResponse):
http_client.request("/http_client_fail")
def test_add_nocopy_with_relative_path(client):
error_msg = None
try:
client.add(calc_path_rel_to_cwd(FAKE_FILE1_PATH), nocopy=True)
except ipfshttpclient.exceptions.ErrorResponse as exc:
error_msg = exc.args[0]
# For relative paths, multipart streaming layer won't append the
# Abspath header, and server will report the missing header. Note that
# currently, the server does report an error if Abspath is present but
# is a relative or nonexistent path -- instead, it silently ignores
# nocopy and adds the file to the blockstore (bug).
assert error_msg is not None and "missing file path" in error_msg
def add_artifact(self, artifact, session, redis=None):
# We cannot add a string using client.add, it will take a string or b-string and tries to load a file
ipfs_uri = self.client.add_str(artifact)
# add_str does not accept any way to set pin=False, so we have to remove in a second call
try:
self.client.pin.rm(ipfs_uri, timeout=1)
except (
ipfshttpclient.exceptions.ErrorResponse, ipfshttpclient.exceptions.TimeoutError
) as e:
logger.warning('Got error when removing pin: %s', e)
# Only seen when the pin didn't exist, not a big deal
pass
if redis:
redis.set(f'polyswarmd:{ipfs_uri}', artifact, ex=300)
return ipfs_uri
an IPFS daemon."""
class StatusError(CommunicationError):
"""Raised when the daemon responds with an error to our request."""
class ErrorResponse(StatusError):
"""Raised when the daemon has responded with an error message because the
requested operation could not be carried out."""
def __init__(self, message, original):
StatusError.__init__(self, original, message)
class PartialErrorResponse(ErrorResponse):
"""Raised when the daemon has responded with an error message after having
already returned some data.
The incomplete data returned may be accessed using the ``partial``
attribute."""
def __init__(self, message, original, partial):
self.partial = partial
ErrorResponse.__init__(self, message, original)
class ConnectionError(CommunicationError):
"""Raised when connecting to the service has failed on the socket layer."""
class TimeoutError(CommunicationError):
try:
decoder = encoding.get_encoding("json")
for chunk in response.iter_content(chunk_size=None):
content += list(decoder.parse_partial(chunk))
content += list(decoder.parse_finalize())
except exceptions.DecodingError:
pass
# If we have decoded an error response from the server,
# use that as the exception message; otherwise, just pass
# the exception on to the caller.
if len(content) == 1 \
and isinstance(content[0], dict) \
and "Message" in content[0]:
msg = content[0]["Message"]
six.raise_from(exceptions.ErrorResponse(msg, error), error)
else:
six.raise_from(exceptions.StatusError(error), error)
def __init__(self, message, original, partial):
self.partial = partial
ErrorResponse.__init__(self, message, original)