Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if data_privkey is not None:
data_pubkey = get_pubkey_hex(str(data_privkey))
if app_config is None:
app_config = app_get_config(blockchain_id, app_domain, data_pubkey=data_pubkey, proxy=proxy, config_path=CONFIG_PATH )
if 'error' in app_config:
if not force:
log.error("Failed to load app config for {}'s {}".format(blockchain_id, app_domain))
return {'error': 'Failed to load app config'}
else:
# keep going
app_config = None
log.warning("Failed to load app config, but proceeding at caller request")
config_data_id = storage.make_fq_data_id(app_domain, '.blockstack')
index_data_id = storage.make_fq_data_id(app_domain, 'index.html')
storage_drivers = None
if app_config is not None:
# only use the ones we have to
urls = user_db.urls_from_uris(app_config['index_uris'])
driver_names = []
for url in urls:
drivers = storage.get_drivers_for_url(url)
driver_names += [d.__name__ for d in drivers]
storage_drivers = list(set(driver_names))
ret = {}
# delete the index
if app_config is not None, then the driver hints will be honored.
Return {'status': True, 'version': ...} on success
Return {'error': ...} on error
"""
if data_privkey is None:
assert wallet_keys, "No data private key or wallet given"
data_privkey = wallet_keys.get('data_privkey', None)
assert data_privkey, "Wallet does not contain a data private key"
data_pubkey = get_pubkey_hex(data_privkey)
proxy = get_default_proxy() if proxy is None else proxy
res_data_id = storage.make_fq_data_id(app_domain, res_name)
driver_hints = None
if app_config is not None:
# use driver hints
driver_hints = app_config['driver_hints']
tombstone = storage.make_data_tombstone(res_data_id)
signed_tombstone = storage.sign_data_tombstone(res_data_id, data_privkey)
res = data.delete_mutable(res_data_id, [signed_tombstone], proxy=proxy, storage_drivers=driver_hints, blockchain_id=blockchain_id, is_fq_data_id=True, config_path=config_path)
if 'error' in res:
log.error("Failed to delete resource {}: {}".format(res_data_id, res['error']))
return {'error': 'Failed to delete resource'}
return {'status': True}
inode_header_blob = data_blob_parse(header_blob_str)
try:
header_fqid = inode_header_blob['fq_data_id']
dev_id, header_id = storage.parse_fq_data_id(header_fqid)
version = inode_header_blob['version']
parts = header_id.split('.')
assert len(parts) == 3, "len is {}".format(len(parts))
assert parts[2] == 'hdr', "{} != {}".format(parts[2], 'hdr')
assert parts[0] == datastore_id, "{} != {}".format(parts[0], datastore_id)
inode_uuid = parts[1]
idata_fqid = storage.make_fq_data_id(dev_id, '{}.{}'.format(datastore_id, inode_uuid))
except Exception as e:
if BLOCKSTACK_DEBUG:
log.exception(e)
log.error("Invalid inode fqid {}".format(header_fqid))
return {'error': 'Invalid inode info', 'errno': errno.EINVAL}
# store payload (no signature; we'll use the header's hash)
res = put_mutable(idata_fqid, idata_str, datastore['pubkey'], encode_signature(0, 0), version, storage_drivers=drivers, storage_drivers_exclusive=True, config_path=config_path, proxy=proxy )
if 'error' in res:
log.error("Failed to replicate inode {}: {}".format(idata_fqid, res['error']))
return {'error': 'Failed to replicate inode', 'errno': errno.EREMOTEIO}
# store header
** Durability **
Replication is best-effort. If one storage provider driver succeeds, the put_mutable succeeds. If they all fail, then put_mutable fails.
More complex behavior can be had by creating a "meta-driver" that calls existing drivers' methods in the desired manner.
Returns a dict with {'status': True, 'version': version, ...} on success
Returns a dict with 'error' set on failure
"""
if type(data_json) not in [dict]:
raise ValueError("Mutable data must be a dict")
if proxy is None:
proxy = get_default_proxy()
fq_data_id = storage.make_fq_data_id( name, data_id )
name_record = None
user_profile, user_zonefile, created_new_zonefile = get_and_migrate_profile( name, create_if_absent=True, proxy=proxy, wallet_keys=wallet_keys, include_name_record=True )
if 'error' in user_profile:
return user_profile
if created_new_zonefile:
log.debug("User profile is in non-standard or legacy format")
return {'error': "User profile is in legacy format, which does not support this operation. You must first migrate it with the 'migrate' command."}
name_record = user_zonefile['name_record']
del user_zonefile['name_record']
user_profile = user_profile['profile']
user_zonefile = user_zonefile['zonefile']
def _get_mutable_data_versions( data_id, device_ids, config_path=CONFIG_PATH ):
"""
Get the mutable data version for a datum spread across multiple devices
Return {'status': True, 'version': version} on success
"""
new_version = 0
conf = get_config(config_path)
assert conf
for device_id in device_ids:
fq_data_id = storage.make_fq_data_id(device_id, data_id)
cur_ver = load_mutable_data_version(conf, device_id, fq_data_id, config_path=config_path)
if cur_ver is not None:
new_version = max(new_version, cur_ver)
return {'status': True, 'version': new_version}
app_cfg_sig = data.data_blob_sign( app_cfg_str, data_privkey )
res = data.put_mutable(config_data_id, app_cfg_str, data_pubkey, app_cfg_sig, app_cfg_blob['version'], blockchain_id=dev_blockchain_id, config_path=config_path)
if 'error' in res:
log.error('Failed to replicate application configuration {}: {}'.format(config_data_id, res['error']))
return {'error': 'Failed to replicate application config'}
# what drivers to use for the index file?
urls = user_db.urls_from_uris(app_index_uris)
driver_names = []
for url in urls:
drivers = storage.get_drivers_for_url(url)
driver_names += [d.__name__ for d in drivers]
driver_names = list(set(driver_names))
index_data_id = storage.make_fq_data_id(app_domain, 'index.html')
# replicate app index file (at least one must succeed)
# NOTE: the publisher is free to use alternative URIs that are not supported; they'll just be ignored.
app_index_blob = data.make_mutable_data_info(index_data_id, app_index_file, is_fq_data_id=True)
app_index_blob_str = data.data_blob_serialize(app_index_blob)
app_index_sig = data.data_blob_sign(app_index_blob_str, data_privkey)
res = data.put_mutable( index_data_id, app_index_blob_str, data_pubkey, app_index_sig, app_index_blob['version'], blockchain_id=dev_blockchain_id, config_path=config_path, storage_drivers=app_driver_hints )
if 'error' in res:
log.error("Failed to replicate application index file to {}: {}".format(",".join(urls), res['error']))
return {'error': 'Failed to replicate index file'}
return {'status': True, 'config_fq_data_id': config_data_id, 'index_fq_data_id': index_data_id}
child_uuid = child_dirent['uuid']
child_type = child_dirent['type']
if child_type == MUTABLE_DATUM_FILE_TYPE and not get_idata:
# done searching, and don't want data
break
# get child
log.debug("Get {} at '{}'".format(child_uuid, '/' + '/'.join(path_parts[:i+1])))
child_entry = get_inode_data(datastore_id, child_uuid, child_type, data_pubkey, drivers, device_ids, config_path=CONFIG_PATH, proxy=proxy)
if 'error' in child_entry:
log.error("Failed to get inode {} at {}: {}".format(child_uuid, prefix + name, child_entry['error']))
return {'error': child_entry['error'], 'errno': child_entry['errno']}
child_entry = child_entry['inode']
assert child_entry['type'] == child_dirent['type'], "Corrupt inode {}".format(storage.make_fq_data_id(datastore_id,child_uuid))
path_ent = _make_path_entry(name, child_uuid, child_entry, prefix)
ret[prefix + name] = path_ent
if child_type == MUTABLE_DATUM_FILE_TYPE or i == len(path_parts) - 1:
break
# keep walking
cur_dir = child_entry
prefix += name + '/'
# did we reach the end?
if i+1 < len(path_parts):
log.debug('Out of path at "{}" (stopped at {} in {})'.format(prefix + name, i, path_parts))
return {'error': 'Not a directory', 'errno': errno.ENOTDIR}
Return {'error': ...} on error
"""
conf = get_config(path=config_path)
assert conf
fq_data_id = None
device_id = get_local_device_id(config_dir=os.path.dirname(config_path))
if device_id is None:
raise Exception("Failed to get device ID")
if device_ids is None:
device_ids = [device_id]
# v2 mutable data from this device
fq_data_id = storage.make_fq_data_id(device_id, data_id)
# get the version to use across all devices
if version is None:
version_info = _get_mutable_data_versions( data_id, device_ids, config_path=config_path)
if version_info['version'] > 0 and create:
log.error("Already exists: {}".format(fq_data_id))
return {'error': 'Data exists', 'errno': errno.EEXIST}
version = version_info['version'] + 1
if timestamp is None:
timestamp = int(time.time())
blob_data = {
'fq_data_id': fq_data_id,
'data': data_payload,
def app_get_resource( blockchain_id, app_domain, res_name, app_config=None, data_pubkey=None, proxy=None, config_path=CONFIG_PATH ):
"""
Get a named application resource from mutable storage
data_pubkey should be the publisher's public key
If app_config is not None, then the driver hints will be honored.
Return {'status': True, 'res': resource} on success
Return {'error': ...} on error
"""
proxy = get_default_proxy() if proxy is None else proxy
res_data_id = storage.make_fq_data_id(app_domain, res_name)
urls = None
if app_config is not None:
# use driver hints
driver_hints = app_config['driver_hints']
urls = storage.get_driver_urls( res_data_id, storage.get_storage_handlers() )
res = data.get_mutable( res_data_id, data_pubkey=data_pubkey, proxy=proxy, config_path=config_path, urls=urls, blockchain_id=blockchain_id, is_fq_data_id=True )
if 'error' in res:
log.error("Failed to get resource {}: {}".format(res_data_id, res['error']))
return {'error': 'Failed to load resource'}
return {'status': True, 'res': res['data']}
if blockstack_profiles.is_profile_in_legacy_format( user_zonefile ) or not user_db.is_user_zonefile( user_zonefile ):
# zonefile is a legacy profile. There is no account data
log.info("Profile is in legacy format. No account data.")
return {'status': True}
res = app_data_sanity_check( name, user_profile, service_id, account_id, data_id, None )
if 'error' in res:
return res
storage_drivers = res['storage_drivers']
data_pubkey = res['data_pubkey']
# NOTE: account data paths include service and account IDs
account_data_id = app_account_data_id( service_id, account_id, data_id )
fq_data_id = storage.make_fq_data_id( name, account_data_id )
urls = storage.make_mutable_data_urls( fq_data_id, use_only=storage_drivers )
if version is None:
version = load_mutable_data_version(conf, name, account_data_id)
if version is None:
version = 1
# get the data
mutable_data = storage.get_mutable_data(fq_data_id, data_pubkey, urls=urls )
if mutable_data is None:
return {'error': "Failed to look up mutable datum"}
if type(mutable_data) not in [dict] or 'data' not in mutable_data or 'version' not in mutable_data:
log.error("Invalid application data %s.%s.%s in %s" % (service_id, account_id, data_id, name))
return {'error': 'Invalid application data'}