Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_unsplit_idna_a_unicode_hostname():
parsed = urlparse(SNOWMAN_HOST)
assert parsed.unsplit(use_idna=True) == SNOWMAN_IDNA_HOST
def test_urlparse_an_invalid_authority_parses_port():
url = "http://foo:b@r@[::1]:80/get"
parsed = urlparse(url)
assert parsed.port == 80
assert parsed.userinfo == "foo:b@r"
assert parsed.hostname == "[::1]"
def url(self):
"""The parsed URL of the Request."""
return rfc3986.urlparse(self.full_url)
self.signer_address = web3.Web3.toChecksumAddress("0x" + web3.Web3.sha3(hexstr=signer_public_key.to_string().hex())[12:].hex())
# Instantiate Ethereum client
provider = web3.HTTPProvider(eth_rpc_endpoint)
self.web3 = web3.Web3(provider)
self.web3.eth.setGasPriceStrategy(rpc_gas_price_strategy)
# Get average block time for current network
latest = self.web3.eth.getBlock("latest")
times = [block.timestamp for block in list(map(lambda n: self.web3.eth.getBlock(n), range(latest.number, latest.number-20, -1)))]
diffs = list(map(operator.sub, times[1:], times[:-1]))
self.average_block_time = abs(reduce(lambda a, b: a+b, diffs) / len(diffs))
# Instantiate IPFS client
ipfs_rpc_endpoint = urlparse(ipfs_rpc_endpoint)
ipfs_scheme = ipfs_rpc_endpoint.scheme if ipfs_rpc_endpoint.scheme else "http"
ipfs_port = ipfs_rpc_endpoint.port if ipfs_rpc_endpoint.port else 5001
self.ipfs_client = ipfsapi.connect(urljoin(ipfs_scheme, ipfs_rpc_endpoint.hostname), ipfs_port)
# Get contract objects
self.mpe_contract = self._get_contract_object("MultiPartyEscrow.json")
self.token_contract = self._get_contract_object("SingularityNetToken.json")
self.registry_contract = self._get_contract_object("Registry.json")
if isinstance(url, bytes):
url = url.decode('utf8')
else:
url = str(url)
# Ignore any leading and trailing whitespace characters.
url = url.strip()
# Don't do any URL preparation for non-HTTP schemes like `mailto`,
# `data` etc to work around exceptions from `url_parse`, which
# handles RFC 3986 only.
if ':' in url and not url.lower().startswith('http'):
self.url = url
return
# Support for unicode domain names and paths.
try:
uri = rfc3986.urlparse(url)
if validate:
rfc3986.normalize_uri(url)
except rfc3986.exceptions.RFC3986Exception:
raise InvalidURL(f"Invalid URL {url!r}: URL is imporoper.")
if not uri.scheme:
error = (
"Invalid URL {0!r}: No scheme supplied. Perhaps you meant http://{0}?"
)
error = error.format(to_native_string(url, 'utf8'))
raise MissingScheme(error)
if not uri.host:
raise InvalidURL(f"Invalid URL {url!r}: No host supplied")
# In general, we want to try IDNA encoding the hostname if the string contains
# Instantiate Ethereum client
eth_rpc_endpoint = self._config.get("eth_rpc_endpoint", "https://mainnet.infura.io/v3/e7732e1f679e461b9bb4da5653ac3fc2")
provider = web3.HTTPProvider(eth_rpc_endpoint)
self.web3 = web3.Web3(provider)
self.web3.eth.setGasPriceStrategy(medium_gas_price_strategy)
# Get MPE contract address from config if specified; mostly for local testing
_mpe_contract_address = self._config.get("mpe_contract_address", None)
if _mpe_contract_address is None:
self.mpe_contract = MPEContract(self.web3)
else:
self.mpe_contract = MPEContract(self.web3, _mpe_contract_address)
# Instantiate IPFS client
ipfs_rpc_endpoint = self._config.get("ipfs_rpc_endpoint", "https://ipfs.singularitynet.io:80")
ipfs_rpc_endpoint = urlparse(ipfs_rpc_endpoint)
ipfs_scheme = ipfs_rpc_endpoint.scheme if ipfs_rpc_endpoint.scheme else "http"
ipfs_port = ipfs_rpc_endpoint.port if ipfs_rpc_endpoint.port else 5001
self.ipfs_client = ipfsapi.connect(urljoin(ipfs_scheme, ipfs_rpc_endpoint.hostname), ipfs_port)
# Get Registry contract address from config if specified; mostly for local testing
_registry_contract_address = self._config.get("registry_contract_address", None)
if _registry_contract_address is None:
self.registry_contract = get_contract_object(self.web3, "Registry.json")
else:
self.registry_contract = get_contract_object(self.web3, "Registry.json", _registry_contract_address)
self.account = Account(self.web3, config, self.mpe_contract)
def _get_base_grpc_channel(self, endpoint):
endpoint_object = urlparse(endpoint)
if endpoint_object.port is not None:
channel_endpoint = endpoint_object.hostname + ":" + str(endpoint_object.port)
else:
channel_endpoint = endpoint_object.hostname
if endpoint_object.scheme == "http":
return grpc.insecure_channel(channel_endpoint)
elif endpoint_object.scheme == "https":
return grpc.secure_channel(channel_endpoint, grpc.ssl_channel_credentials())
else:
raise ValueError('Unsupported scheme in service metadata ("{}")'.format(endpoint_object.scheme))
def setup_acme_client(s3_client, acme_directory_url, acme_account_key):
uri = rfc3986.urlparse(acme_account_key)
if uri.scheme == "file":
if uri.host is None:
path = uri.path
elif uri.path is None:
path = uri.host
else:
path = os.path.join(uri.host, uri.path)
with open(path) as f:
key = f.read()
elif uri.scheme == "s3":
# uri.path includes a leading "/"
response = s3_client.get_object(Bucket=uri.host, Key=uri.path[1:])
key = response["Body"].read()
else:
raise ValueError(
"Invalid acme account key: {!r}".format(acme_account_key)