Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _try():
hint = convert_legacy_hint(location)
if ":" not in hint:
raise InvalidHintError("no colon")
hint_type = hint.split(":", 1)[0]
plugin = connectionPlugins.get(hint_type)
if not plugin:
connectionInfo._describe_connection_handler(location, None)
raise InvalidHintError("no handler registered")
connectionInfo._describe_connection_handler(location,
describe_handler(plugin))
_update_status("resolving hint")
d = defer.maybeDeferred(plugin.hint_to_endpoint, hint, reactor, _update_status)
def problem(f):
log.err(f, "error in hint_to_endpoint", level=UNUSUAL,
facility="foolscap.connection", umid="Fxtg6A")
# this hint will be ignored
return f
d.addErrback(problem)
return d
def _try():
hint = convert_legacy_hint(location)
if ":" not in hint:
raise InvalidHintError("no colon")
hint_type = hint.split(":", 1)[0]
plugin = connectionPlugins.get(hint_type)
if not plugin:
connectionInfo._describe_connection_handler(location, None)
raise InvalidHintError("no handler registered")
connectionInfo._describe_connection_handler(location,
describe_handler(plugin))
_update_status("resolving hint")
d = defer.maybeDeferred(plugin.hint_to_endpoint, hint, reactor, _update_status)
def problem(f):
log.err(f, "error in hint_to_endpoint", level=UNUSUAL,
facility="foolscap.connection", umid="Fxtg6A")
# this hint will be ignored
return f
d.addErrback(problem)
return d
return defer.maybeDeferred(_try)
def hint_to_endpoint(self, hint, reactor, update_status):
mo = HINT_RE.search(hint)
if not mo:
raise InvalidHintError("unrecognized hint, wanted TYPE:HOST:PORT")
host, port = mo.group(1), int(mo.group(2))
# note: txsockx does not expose a way to provide the reactor
ep = SOCKS5ClientEndpoint(host, port, self._proxy_endpoint)
return ep, host
def hint_to_endpoint(self, hint, reactor, update_status):
# Return (endpoint, hostname), where "hostname" is what we pass to the
# HTTP "Host:" header so a dumb HTTP server can be used to redirect us.
mo = HINT_RE.search(hint)
if not mo:
raise InvalidHintError("unrecognized I2P hint")
host, portnum = mo.group(1), int(mo.group(3)) if mo.group(3) else None
kwargs = self._kwargs
if not portnum and 'port' in kwargs:
portnum = kwargs.pop('port')
ep = SAMI2PStreamClientEndpoint.new(self._sam_endpoint, host, portnum, **kwargs)
return ep, host
def hint_to_endpoint(self, hint, reactor, update_status):
# Return (endpoint, hostname), where "hostname" is what we pass to the
# HTTP "Host:" header so a dumb HTTP server can be used to redirect us.
mo = HINT_RE.search(hint)
if not mo:
raise InvalidHintError("unrecognized I2P hint")
host, portnum = mo.group(1), int(mo.group(3)) if mo.group(3) else None
ep = SAMI2PStreamClientEndpoint.new(self._sam_endpoint, host, portnum)
return ep, host