Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:param block_number:
:param udblocks: /blockchain/ud/history data of given key
:return:
"""
try:
if not udblocks:
udblocks = await self._bma_connector.get(currency, bma.blockchain.ud)
udblocks = udblocks['result']['blocks']
ud_block_number = next(b for b in udblocks if b <= block_number)
block = await self._bma_connector.get(currency, bma.blockchain.block, {'number': ud_block_number})
return block['dividend'], block['unitbase']
except StopIteration:
self._logger.debug("No dividend generated before {0}".format(block_number))
except NoPeerAvailable as e:
self._logger.debug(str(e))
except errors.DuniterError as e:
if e.ucode == errors.BLOCK_NOT_FOUND:
self._logger.debug(str(e))
else:
raise
return 0, 0
toast.display(self.tr("UID broadcast"), self.tr("Identity broadcasted to the network"))
QApplication.restoreOverrideCursor()
self.config_dialog.next()
else:
self.config_dialog.label_error.setText(self.tr("Error") + " " + \
self.tr("{0}".format(result[1])))
if self.app.preferences['notifications']:
toast.display(self.tr("Error"), self.tr("{0}".format(result[1])))
QApplication.restoreOverrideCursor()
self.config_dialog.community = community
elif registered[0] is False and registered[2]:
self.config_dialog.label_error.setText(self.tr("""Your pubkey or UID was already found on the network.
Yours : {0}, the network : {1}""".format(registered[1], registered[2])))
else:
self.config_dialog.label_error.setText(self.tr("Your account already exists on the network"))
except (MalformedDocumentError, ValueError, errors.DuniterError,
aiohttp.errors.ClientError, aiohttp.errors.DisconnectedError) as e:
session.close()
self.config_dialog.label_error.setText(str(e))
except NoPeerAvailable:
self.config_dialog.label_error.setText(self.tr("Could not connect. Check node peering entry"))
except aiohttp.errors.TimeoutError:
self.config_dialog.label_error.setText(self.tr("Could not connect. Check hostname, ip address or port"))
current_block_number = self.community.network.current_blockUID.number
if current_block_number:
text += self.tr("Block {0}").format(current_block_number)
try:
block = await self.community.get_block(current_block_number)
text += " ({0})".format(QLocale.toString(
QLocale(),
QDateTime.fromTime_t(block['medianTime']),
QLocale.dateTimeFormat(QLocale(), QLocale.NarrowFormat)
))
except NoPeerAvailable as e:
logging.debug(str(e))
text += " ( ### ) "
except errors.DuniterError as e:
if e.ucode == errors.BLOCK_NOT_FOUND:
logging.debug(str(e))
if len(self.community.network.synced_nodes) == 0:
self.button_membership.setEnabled(False)
self.button_certification.setEnabled(False)
self.button_send_money.setEnabled(False)
else:
self.button_send_money.setEnabled(True)
self.refresh_quality_buttons()
if self.community.network.quality > 0.66:
icon = ':/icons/connected'
elif self.community.network.quality > 0.33:
icon = ':/icons/weak_connect'
else:
icon = ':/icons/disconnected'
async def execute_requests(parsers, search):
tries = 0
request = bma.wot.CertifiersOf
nonlocal registered
#TODO: The algorithm is quite dirty
#Multiplying the tries without any reason...
while tries < 3 and not registered[0] and not registered[2]:
try:
data = await community.bma_access.simple_request(request,
req_args={'search': search})
if data:
registered = parsers[request](data)
tries += 1
except errors.DuniterError as e:
if e.ucode in (errors.NO_MEMBER_MATCHING_PUB_OR_UID,
e.ucode == errors.NO_MATCHING_IDENTITY):
if request == bma.wot.CertifiersOf:
request = bma.wot.Lookup
tries = 0
else:
tries += 1
else:
tries += 1
except asyncio.TimeoutError:
tries += 1
except ClientError:
tries += 1
async def send_selfcert(self, password, community):
"""
Send our self certification to a target community
:param str password: The account SigningKey password
:param community: The community target of the self certification
"""
try:
block_data = await community.bma_access.simple_request(bma.blockchain.Current)
signed_raw = "{0}{1}\n".format(block_data['raw'], block_data['signature'])
block_uid = Block.from_signed_raw(signed_raw).blockUID
except errors.DuniterError as e:
if e.ucode == errors.NO_CURRENT_BLOCK:
block_uid = BlockUID.empty()
else:
raise
selfcert = SelfCertification(PROTOCOL_VERSION,
community.currency,
self.pubkey,
self.name,
block_uid,
None)
key = SigningKey(self.salt, password)
selfcert.sign([key])
logging.debug("Key publish : {0}".format(selfcert.signed_raw()))
responses = await community.bma_access.broadcast(bma.wot.Add, {}, {'identity': selfcert.signed_raw()})
result = (False, "")
for r in responses:
async def search_text(self, text):
"""
Search identities using given text
:param str text: text to search
:return:
"""
try:
identities = await self.model.lookup_identities(text)
self.model.refresh_identities(identities)
except errors.DuniterError as e:
if e.ucode == errors.BLOCK_NOT_FOUND:
logging.debug(str(e))
except NoPeerAvailable as e:
logging.debug(str(e))
tries = 0
block_doc = None
block = None
while block is None and tries < 3:
try:
block = await community.bma_access.future_request(bma.blockchain.Block,
req_args={'number': number})
signed_raw = "{0}{1}\n".format(block['raw'],
block['signature'])
try:
block_doc = Block.from_signed_raw(signed_raw)
except TypeError:
logging.debug("Error in {0}".format(number))
block = None
tries += 1
except errors.DuniterError as e:
if e.ucode == errors.BLOCK_NOT_FOUND:
block = None
tries += 1
return block_doc
try:
identity = self.identities_service.get_identity(self.connection.pubkey, self.connection.uid)
if identity:
mstime_remaining = self.identities_service.ms_time_remaining(identity)
is_member = identity.member
outdistanced = identity.outdistanced
written = identity.written
if not written:
identity_expiration = identity.timestamp + self.parameters().sig_window
identity_expired = identity_expiration < self.blockchain_processor.time(self.connection.currency)
identity_expiration = self.blockchain_processor.adjusted_ts(self.app.currency,
identity_expiration)
nb_certs = len(self.identities_service.certifications_received(identity.pubkey))
if not identity.outdistanced:
outdistanced_text = self.tr("In WoT range")
except errors.DuniterError as e:
if e.ucode == errors.NO_MEMBER_MATCHING_PUB_OR_UID:
pass
else:
self._logger.error(str(e))
return {
'written': written,
'idty_expired': identity_expired,
'idty_expiration': identity_expiration,
'amount': localized_amount,
'is_outdistanced': outdistanced,
'outdistanced': outdistanced_text,
'nb_certs': nb_certs,
'nb_certs_required': nb_certs_required,
'mstime': mstime_remaining,
'membership_state': is_member,