Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def refresh_uid(self):
"""
Refresh the node UID
"""
conn_handler = self.endpoint.conn_handler()
try:
data = await bma.wot.Lookup(conn_handler, self.pubkey).get(self._session)
self.state = Node.ONLINE
timestamp = BlockUID.empty()
uid = ""
for result in data['results']:
if result["pubkey"] == self.pubkey:
uids = result['uids']
for uid in uids:
if BlockUID.from_str(uid["meta"]["timestamp"]) >= timestamp:
timestamp = uid["meta"]["timestamp"]
uid = uid["uid"]
if self._uid != uid:
self._uid = uid
self.identity_changed.emit()
except errors.DuniterError as e:
if e.ucode == errors.NO_MATCHING_IDENTITY:
logging.debug("UID not found : {0}".format(self.pubkey[:5]))
else:
logging.debug("error in uid reply : {0}".format(self.pubkey[:5]))
self.state = Node.OFFLINE
self.identity_changed.emit()
except (ClientError, gaierror, TimeoutError, DisconnectedError, ValueError) as e:
logging.debug("{0} : {1}".format(type(e).__name__, self.pubkey[:5]))
self.state = Node.OFFLINE
except jsonschema.ValidationError as e:
async def _async_execute_search_text(self, checked):
cancel_once_task(self, self._async_search_direct_connections)
self.ui.busy.show()
text = self.ui.edit_textsearch.text()
if len(text) < 2:
return
try:
response = await self.community.bma_access.future_request(bma.wot.Lookup, {'search': text})
identities = []
for identity_data in response['results']:
for uid_data in identity_data['uids']:
identity = Identity.from_handled_data(uid_data['uid'],
identity_data['pubkey'],
BlockUID.from_str(uid_data['meta']['timestamp']),
BlockchainState.BUFFERED)
identities.append(identity)
self.ui.edit_textsearch.clear()
self.ui.edit_textsearch.setPlaceholderText(text)
await self.refresh_identities(identities)
except errors.DuniterError as e:
if e.ucode == errors.BLOCK_NOT_FOUND:
logging.debug(str(e))
except NoPeerAvailable as e:
logging.debug(str(e))
finally:
self.ui.busy.hide()
def _parse_pubkey_lookup(data):
timestamp = BlockUID.empty()
found_uid = ""
found_result = ["", ""]
for result in data['results']:
uids = result['uids']
for uid_data in uids:
if BlockUID.from_str(uid_data["meta"]["timestamp"]) >= timestamp:
timestamp = BlockUID.from_str(uid_data["meta"]["timestamp"])
found_uid = uid_data["uid"]
if found_uid == self.name:
found_result = result['pubkey'], found_uid
if found_result[1] == self.name:
return self.pubkey == found_result[0], self.pubkey, found_result[0]
else:
return False, self.pubkey, None
def _parse_uid_lookup(data):
timestamp = BlockUID.empty()
found_uid = ""
for result in data['results']:
if result["pubkey"] == identity.pubkey:
uids = result['uids']
for uid_data in uids:
if BlockUID.from_str(uid_data["meta"]["timestamp"]) >= timestamp:
timestamp = BlockUID.from_str(uid_data["meta"]["timestamp"])
found_identity.blockstamp = timestamp
found_uid = uid_data["uid"]
found_identity.signature = uid_data["self"]
return identity.uid == found_uid, identity.uid, found_uid
:param current_block: Current block data
:param pubkey: UID/Public key
:rtype: Identity
"""
# Here we request for the path wot/lookup/pubkey
lookup_data = await client(bma.wot.lookup, pubkey)
identity = None
# parse results
for result in lookup_data["results"]:
if result["pubkey"] == pubkey:
uids = result["uids"]
uid_data = uids[0]
# capture data
timestamp = BlockUID.from_str(uid_data["meta"]["timestamp"])
uid = uid_data["uid"] # type: str
signature = uid_data["self"] # type: str
# return self-certification document
identity = Identity(
version=10,
currency=current_block["currency"],
pubkey=pubkey,
uid=uid,
ts=timestamp,
signature=signature,
)
break
return identity
def from_json(cls, json_data, version):
"""
Create a person from json data
:param dict json_data: The person as a dict in json format
:return: A new person if pubkey wasn't known, else a new person instance.
"""
pubkey = json_data['pubkey']
uid = json_data['uid']
local_state = LocalState[json_data['local_state']]
blockchain_state = BlockchainState[json_data['blockchain_state']]
if version >= parse_version("0.20.0dev0") and json_data['sigdate']:
sigdate = BlockUID.from_str(json_data['sigdate'])
else:
sigdate = BlockUID.empty()
return cls(uid, pubkey, sigdate, local_state, blockchain_state)
def _refresh_uid(self, uids):
"""
Refresh UID from uids list, got from a successful lookup request
:param list uids: UIDs got from a lookup request
"""
timestamp = BlockUID.empty()
if self.local_state == LocalState.NOT_FOUND:
for uid_data in uids:
if BlockUID.from_str(uid_data["meta"]["timestamp"]) >= timestamp:
timestamp = BlockUID.from_str(uid_data["meta"]["timestamp"])
identity_uid = uid_data["uid"]
self.uid = identity_uid
self.blockchain_state = BlockchainState.BUFFERED
self.local_state = LocalState.PARTIAL
def _parse_uid_lookup(data):
timestamp = BlockUID.empty()
found_uid = ""
for result in data['results']:
if result["pubkey"] == self.pubkey:
uids = result['uids']
for uid_data in uids:
if BlockUID.from_str(uid_data["meta"]["timestamp"]) >= timestamp:
timestamp = uid_data["meta"]["timestamp"]
found_uid = uid_data["uid"]
return self.name == found_uid, self.name, found_uid
def _parse_uid_lookup(data):
timestamp = BlockUID.empty()
found_uid = ""
for result in data['results']:
if result["pubkey"] == identity.pubkey:
uids = result['uids']
for uid_data in uids:
if BlockUID.from_str(uid_data["meta"]["timestamp"]) >= timestamp:
timestamp = BlockUID.from_str(uid_data["meta"]["timestamp"])
found_identity.blockstamp = timestamp
found_uid = uid_data["uid"]
found_identity.signature = uid_data["self"]
return identity.uid == found_uid, identity.uid, found_uid
def load(cls, data):
"""
Create a new transfer from a dict in json format.
:param dict data: The loaded data
:return: A new transfer
:rtype: Transfer
"""
return cls(data['hash'],
TransferState[data['state']],
BlockUID.from_str(data['blockUID']) if data['blockUID'] else None,
data['metadata'], data['local'])