Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_update_userprefs():
client = HTTPClient('http://localhost:8080')
req = Request(
'yo.enable_transports',
username=username,
transports={
'vote': ['email', 'gareth@steemit.com']
})
print('request sent:')
pprint.pprint(req)
results = client.send(req)
print('Server response:')
pprint.pprint(results)
def test_bad_credentials():
"""Test needs_auth() with bad credentials"""
tested_func = utils.needs_auth(lambda x: None)
bad_pub_key = 'STM6sXZdPhLGrDP1MRyZ2zbhXUGNAbrS7qj1o8TFwNkzg47PRQMXK' # does not exist in the blockchain for @steemit user
bad_priv_key = '5JVEH3LTxHzwfwdRB9nDZG1mmFugGWWaGk4fccz1GPt4bwtXy3F'
request = Request('test_method',stuff=1)
canon_req,hex_sig,wif = jsonrpc_auth.sign_request(request,bad_priv_key,bad_pub_key)
retval = tested_func(username='steemit',orig_req=json.loads(canon_req),skip_auth=False)
assert retval != None
assert retval['status']=='error'
assert retval['error_type']=='INVALID_CREDENTIALS'
def get_ops_in_blocks_from_account_history_api(self, blocks, virtual_only=False):
# account_history_api method
api = 'account_history_api'
method = 'get_ops_in_block'
# assemble list with multiple requests for batch
requests = [
Request('.'.join([api, method]), {
'block_num': i,
'only_virtual': virtual_only
}) for i in blocks
]
# get response
response = self.client.send(requests)
# return the resulting ops
return [r['result']['ops'] for r in response]
if not 'AuthKey' in request['params'].keys(): return False
if not 'AuthSig' in request['params'].keys(): return False
user_account = Account(username)
posting_keys = []
for key in user_account['posting']['key_auths']: posting_keys.append(key[0])
logger.debug('verify_request got key %s, valid keys are %s' % (request['params']['AuthKey'],str(posting_keys)))
if not request['params']['AuthKey'] in posting_keys: return False
return verify_request_with_pub(json.dumps(request),request['params']['AuthKey'],request['params']['AuthSig'])
if __name__=='__main__':
logging.basicConfig(level=logging.DEBUG)
pub_key = 'STM5sWzsUCociNCxaRJhQ1WKyZVBduDUy7uR4yrifX9vLhh9LxeKv'
priv_key = '5JxQDXSZNBLLTMqMFMGswdCnwn5KWdFv6NVSBEJTLEg1v5gvU9b'
request = Request('update_preferences',test_pref=1,details={'username':'testuser','prefer_ssl':True})
print('Signing request %s with private key %s' % (str(request),priv_key))
canon_req,hex_sig,wif = sign_request(request,priv_key,pub_key)
print('Got canon request: %s' % canon_req)
print('Got auth signature: %s' % hex_sig)
print('Verifying request with public key %s' % pub_key)
verified = verify_request_with_pub(canon_req,pub_key,hex_sig)
print('Verified: %s' % str(verified))
def get_blocks(self, blocks=[]):
# block_api method
api = 'block_api'
method = 'get_block'
if self.is_api_available(api, method):
# assemble list with multiple requests for batch
requests = [
Request('.'.join([api, method]), {
'block_num': i
}) for i in blocks
]
# get response
response = self.client.send(requests)
# return the resulting block of each result
return [dict(r['result']['block'], **{'block_num': int(str(r['result']['block']['block_id'])[:8], base=16)}) for r in response]
def request(self, method_name, *args, **kwargs):
# start_time = time.time()
response = super(Client, self).send(Request(method_name, *args, **kwargs))
# total_time = "%.3f" % (time.time() - start_time)
# print("[{}] http request - {} kb / {} sec - {} {}".format(datetime.datetime.now(), sys.getsizeof(str(response)) / 1000, total_time, method_name, list(args)))
# print(response)
if 'error' in response:
print("response error")
print(response)
raise requests.RequestException()
return response
async def _subscribe_request(self, block_height):
request = Request(
method="node_ws_Subscribe",
height=block_height,
peer_id=ChannelProperty().peer_id
)
await self._websocket.send(json.dumps(request))
async def request(self, to, method, timeout__=RPC_TIMEOUT, **params):
"""
Arguments
timeout__ (int,str) Custom timeout
"""
mc = MethodCall(dest=to, method=method, source=self.name)
req_id = str(next(self.id_gen))
req = Request(mc.tos(), params, request_id=req_id)
return await self.send(
req, request_id=req['id'], timeout__=int(timeout__), to=to)