Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Create target file in directory
client.files.write(
TEST_MFS_DIRECTORY + "/" + filename,
desc["Name"], create=True
)
# Verify directory contents
contents = client.files.ls(TEST_MFS_DIRECTORY)[u"Entries"]
filenames1 = list(map(lambda d: d["Name"], contents))
filenames2 = list(TEST_MFS_FILES.keys())
assert filenames1 == filenames2
# Remove directory
client.files.rm(TEST_MFS_DIRECTORY, recursive=True)
with pytest.raises(ipfshttpclient.exceptions.Error):
client.files.stat(TEST_MFS_DIRECTORY)
def is_available():
"""
Return whether the IPFS daemon is reachable or not
"""
global __is_available
if not isinstance(__is_available, bool):
try:
ipfshttpclient.connect()
except ipfshttpclient.exceptions.Error as error:
__is_available = False
# Make sure version incompatiblity is displayed to the user
if isinstance(error, ipfshttpclient.exceptions.VersionMismatch):
raise
else:
__is_available = True
return __is_available
# Create target file in directory
self.http_client.files_write(
self.test_directory_path + "/" + filename,
desc[u'Name'], create=True
)
# Verify directory contents
contents = self.http_client.files_ls(self.test_directory_path)[u'Entries']
filenames1 = list(map(lambda d: d[u'Name'], contents))
filenames2 = list(self.test_files.keys())
self.assertEqual(filenames1, filenames2)
# Remove directory
self.http_client.files_rm(self.test_directory_path, recursive=True)
with self.assertRaises(ipfshttpclient.exceptions.Error):
self.http_client.files_stat(self.test_directory_path)
One or more keys whose values should be looked up
Returns
-------
str
"""
args = (key,) + keys
res = self._client.request('/dht/get', args, decoder='json', **kwargs)
if isinstance(res, dict) and "Extra" in res:
return res["Extra"]
else:
for r in res:
if "Extra" in r and len(r["Extra"]) > 0:
return r["Extra"]
raise exceptions.Error("empty response from DHT")
One or more keys whose values should be looked up
Returns
-------
str
"""
args = (key,) + keys
res = self._client.request('/dht/get', args, decoder='json', **kwargs)
if isinstance(res, dict) and "Extra" in res:
return res["Extra"]
else:
for r in res:
if "Extra" in r and len(r["Extra"]) > 0:
return r["Extra"]
raise exceptions.Error("empty response from DHT")
def __init__(self, current, minimum, maximum):
self.current = current
self.minimum = minimum
self.maximum = maximum
msg = "Unsupported daemon version '{}' (not in range: {} – {})".format(
current, minimum, maximum
)
Error.__init__(self, msg)
###############
# encoding.py #
###############
class EncoderError(Error):
"""Base class for all encoding and decoding related errors."""
def __init__(self, message, encoder_name):
self.encoder_name = encoder_name
Error.__init__(self, message)
class EncoderMissingError(EncoderError):
"""Raised when a requested encoder class does not actually exist."""
def __init__(self, encoder_name):
msg = "Unknown encoder: '{}'".format(encoder_name)
EncoderError.__init__(self, msg, encoder_name)
:param config:
:param name:
:param ipns_key:
:return:
"""
ipns_addr = None
if ipns_key is None:
wanna_ipns = inquirer.shortcuts.confirm('Do you want to publish to IPNS?', default=True)
if wanna_ipns:
ipns_key = f'{IPNS_KEYS_NAME_PREFIX}_{name}'
try:
out = config.ipfs.key.gen(ipns_key, IPNS_KEYS_TYPE)
except ipfshttpclient.exceptions.Error:
use_existing = inquirer.shortcuts.confirm(f'There is already IPNS key with name \'{ipns_key}\', '
f'do you want to use it?', default=True)
if use_existing:
keys = config.ipfs.key.list()
out = next((x for x in keys['Keys'] if x['Name'] == ipns_key), None)
if out is None:
raise exceptions.RepoException('We were not able to generate or fetch the IPNS key')
else:
while True:
ipns_key = inquirer.shortcuts.text('Then please provide non-existing name for the IPNS key')
try:
out = config.ipfs.key.gen(ipns_key, IPNS_KEYS_TYPE)
break
class DecodingError(EncoderError):
"""Raised when decoding a byte string to a Python object has failed due to
some problem with the input data."""
def __init__(self, encoder_name, original):
self.original = original
msg = "Object decoding error: {}".format(original)
EncoderError.__init__(self, msg, encoder_name)
###########
# http.py #
###########
class CommunicationError(Error):
"""Base class for all network communication related errors."""
def __init__(self, original, _message=None):
self.original = original
if _message:
msg = _message
else:
msg = "{}: {}".format(original.__class__.__name__, str(original))
Error.__init__(self, msg)
class ProtocolError(CommunicationError):
"""Raised when parsing the response from the daemon has failed.
This can most likely occur if the service on the remote end isn't in fact
def __init__(self, current, minimum, maximum):
self.current = current
self.minimum = minimum
self.maximum = maximum
msg = "Unsupported daemon version '{}' (not in range: {} – {})".format(
current, minimum, maximum
)
Error.__init__(self, msg)