Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def tearDownModule():
config.debug = False
def setUpModule():
config.debug = True
def tearDown(self):
config.notification_errors = False
import time
from aiohttp import web
import json
from jsonrpcserver.aio import methods
from jsonrpcserver import config, status
from jsonrpcserver.exceptions import JsonRpcServerError
from spruned.application import exceptions
from spruned.application.exceptions import InvalidPOWException, ItemNotFoundException
from spruned.application.logging_factory import Logger
from spruned.daemon.exceptions import GenesisTransactionRequestedException
from spruned import __version__ as spruned_version
from spruned.dependencies.pybitcointools import address_to_script
config.schema_validation = False
API_HELP = \
"""== Blockchain ==
getbestblockhash
getblock "blockhash" ( verbosity )
getblockchaininfo
getblockcount
getblockhash height
getblockheader "hash" ( verbose )
gettxout "txid" n ( include_mempool )
getmempoolinfo
getrawmempool
== Rawtransactions ==
getrawtransaction "txid" ( verbose )
sendrawtransaction "hexstring" ( allowhighfees )
from quarkchain.cluster.log_filter import LogFilter
from quarkchain.cluster.subscription import SUB_LOGS
# defaults
DEFAULT_STARTGAS = 100 * 1000
DEFAULT_GASPRICE = 10 * denoms.gwei
# Allow 16 MB request for submitting big blocks
# TODO: revisit this parameter
JSON_RPC_CLIENT_REQUEST_MAX_SIZE = 16 * 1024 * 1024
# Disable jsonrpcserver logging
config.log_requests = False
config.log_responses = False
EMPTY_TX_ID = "0x" + "0" * Constant.TX_ID_HEX_LENGTH
def quantity_decoder(hex_str, allow_optional=False):
"""Decode `hexStr` representing a quantity."""
if allow_optional and hex_str is None:
return None
# must start with "0x"
if not hex_str.startswith("0x") or len(hex_str) < 3:
raise InvalidParams("Invalid quantity encoding")
try:
return int(hex_str, 16)
except ValueError:
raise InvalidParams("Invalid quantity encoding")
import uuid
from quarkchain.cluster.log_filter import LogFilter
from quarkchain.cluster.subscription import SUB_LOGS
# defaults
DEFAULT_STARTGAS = 100 * 1000
DEFAULT_GASPRICE = 10 * denoms.gwei
# Allow 16 MB request for submitting big blocks
# TODO: revisit this parameter
JSON_RPC_CLIENT_REQUEST_MAX_SIZE = 16 * 1024 * 1024
# Disable jsonrpcserver logging
config.log_requests = False
config.log_responses = False
EMPTY_TX_ID = "0x" + "0" * Constant.TX_ID_HEX_LENGTH
def quantity_decoder(hex_str, allow_optional=False):
"""Decode `hexStr` representing a quantity."""
if allow_optional and hex_str is None:
return None
# must start with "0x"
if not hex_str.startswith("0x") or len(hex_str) < 3:
raise InvalidParams("Invalid quantity encoding")
try:
return int(hex_str, 16)
except ValueError:
from jsonrpcclient.request import Request
from jsonrpcserver import config
from jsonrpcserver.aio import AsyncMethods
from websockets import WebSocketClientProtocol
from loopchain import configure as conf
from loopchain import utils
from loopchain.baseservice import ObjectManager, TimerService, Timer
from loopchain.blockchain import AnnounceNewBlockError
from loopchain.blockchain.blocks import BlockSerializer, BlockVerifier
from loopchain.blockchain.votes import Votes
from loopchain.channel.channel_property import ChannelProperty
from loopchain.protos import message_code
config.log_requests = False
config.log_responses = False
ws_methods = AsyncMethods()
CONNECTION_FAIL_CONDITIONS = {
message_code.Response.fail_subscribe_limit,
message_code.Response.fail_connection_closed,
message_code.Response.fail_connect_to_leader
}
class UnregisteredException(Exception):
"""When UnregisteredException is raised during Watch state,
this node would transit the state to SubscribeNetwork and init next radiostation target.
"""
pass
import asyncio
import datetime
import functools
import json
import os
from aiohttp import web
from jsonrpcserver import config
from jsonrpcserver.async_methods import AsyncMethods
import structlog
import uvloop
import yo.api_methods
config.log_responses = False
config.log_requests = False
logger = structlog.getLogger(__name__)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
def default_json(obj):
if isinstance(obj, datetime.datetime):
return str(obj)
raise TypeError('Unable to serialize {!r}'.format(obj))
json_dumps = functools.partial(json.dumps, default=default_json)
json_response = functools.partial(web.json_response, dumps=json_dumps)
from jsonrpcserver.aio import AsyncMethods
from ..lib.structs import MethodRegistration
from ..constants import ROLES
from ..log import logger
import jsonrpcserver
jsonrpcserver.config.log_requests = False
jsonrpcserver.config.log_responses = False
jsonrpcserver.config.trim_log_values = True
class AsyncRPCMethods(AsyncMethods):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._roles = dict()
def add_method(self, handler, options={}, *args, **kwargs):
method_name = kwargs.pop('name', handler.__name__)
role = kwargs.pop('role', None)
self._roles[method_name] = MethodRegistration(
method=method_name, role=role, options=options)
self[method_name] = handler
import structlog
import uvloop
from aiohttp import web
from jsonrpcserver import config
from jsonrpcserver.async_methods import AsyncMethods
from sqlalchemy.engine.url import make_url
from .methods.account_history_api.methods import get_ops_in_block
from .methods.account_history_api.methods import get_account_history
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
config.log_responses = False
config.log_requests = False
logger = structlog.get_logger(__name__)
# pylint: disable=redefined-outer-name
def default_json(obj):
if isinstance(obj, datetime.datetime):
return str(obj)
raise TypeError('Unable to serialize {!r}'.format(obj))
json_dumps = functools.partial(json.dumps, default=default_json)
json_response = functools.partial(web.json_response, dumps=json_dumps)