Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_descriptor_wait(self):
eph = torconfig.EphemeralHiddenService(["80 127.0.0.1:80"])
proto = Mock()
proto.queue_command = Mock(return_value=defer.succeed("PrivateKey=blam\nServiceID=ohai\n"))
eph.add_to_tor(proto)
# get the event-listener callback that torconfig code added;
# the first call [0] was to add_event_listener; we want the
# [1] arg of that
cb = proto.method_calls[0][1][1]
# Tor doesn't actually provide the .onion, but we can test it anyway
cb('UPLOADED ohai UNKNOWN somehsdir')
cb('UPLOADED UNKNOWN UNKNOWN somehsdir')
self.assertEqual("blam", eph.private_key)
self.assertEqual("ohai.onion", eph.hostname)
def test_wrong_blob(self):
wrong_blobs = ["", " ", "foo", ":", " : ", "foo:", ":foo", 0]
for b in wrong_blobs:
try:
torconfig.EphemeralHiddenService(["80 localhost:80"], b)
self.fail("should get exception")
except ValueError:
pass
def test_single_failed_upload(self):
eph = torconfig.EphemeralHiddenService("80 127.0.0.1:80")
proto = Mock()
proto.queue_command = Mock(return_value=defer.succeed("PrivateKey=seekrit\nServiceID=42\n"))
d = eph.add_to_tor(proto)
# get the event-listener callback that torconfig code added;
# the last call [-1] was to add_event_listener; we want the
# [1] arg of that
cb = proto.method_calls[-1][1][1]
# Tor leads with UPLOAD events for each attempt; we queue 2 of
# these...
cb('UPLOAD 42 UNKNOWN hsdir0')
cb('UPLOAD 42 UNKNOWN hsdir1')
# ...then fail one
def test_add_keyblob(self):
eph = torconfig.EphemeralHiddenService("80 127.0.0.1:80", "alg:blam")
proto = Mock()
proto.queue_command = Mock(return_value="ServiceID=ohai")
eph.add_to_tor(proto)
self.assertEqual("alg:blam", eph.private_key)
self.assertEqual("ohai.onion", eph.hostname)
def test_add_keyblob(self):
eph = torconfig.EphemeralHiddenService(["80 127.0.0.1:80"], "alg:blam")
proto = Mock()
proto.queue_command = Mock(return_value="ServiceID=ohai")
eph.add_to_tor(proto)
self.assertEqual("alg:blam", eph.private_key)
self.assertEqual("ohai.onion", eph.hostname)
def test_defaults(self):
eph = torconfig.EphemeralHiddenService("80 localhost:80")
self.assertEqual(eph._ports, ["80,localhost:80"])
def test_failed_upload(self):
eph = torconfig.EphemeralHiddenService("80 127.0.0.1:80")
proto = Mock()
proto.queue_command = Mock(return_value=defer.succeed("PrivateKey=seekrit\nServiceID=42\n"))
d = eph.add_to_tor(proto)
# get the event-listener callback that torconfig code added;
# the last call [-1] was to add_event_listener; we want the
# [1] arg of that
cb = proto.method_calls[-1][1][1]
# Tor leads with UPLOAD events for each attempt; we queue 2 of
# these...
cb('UPLOAD 42 UNKNOWN hsdir0')
cb('UPLOAD 42 UNKNOWN hsdir1')
# ...but fail them both
def test_failed_upload(self):
eph = torconfig.EphemeralHiddenService(["80 127.0.0.1:80"])
proto = Mock()
proto.queue_command = Mock(return_value=defer.succeed("PrivateKey=seekrit\nServiceID=42\n"))
d = eph.add_to_tor(proto)
# get the event-listener callback that torconfig code added;
# the first call [0] was to add_event_listener; we want the
# [1] arg of that
cb = proto.method_calls[0][1][1]
# Tor leads with UPLOAD events for each attempt; we queue 2 of
# these...
cb('UPLOAD 42 UNKNOWN hsdir0')
cb('UPLOAD 42 UNKNOWN hsdir1')
# ...but fail them both
self.torconfig = txtorcon.TorConfig(control=self.tor.protocol)
yield self.torconfig.post_bootstrap
hs_strings = []
if len(self.onion_unix_socket) == 0:
local_socket_endpoint_desc = "tcp:interface=%s:%s" % (self.onion_tcp_interface_ip, self.onion_tcp_port)
else:
local_socket_endpoint_desc = "unix:%s" % self.onion_unix_socket
onion_service_endpoint = endpoints.serverFromString(self.reactor, local_socket_endpoint_desc)
datagram_proxy_factory = OnionDatagramProxyFactory(received_handler=lambda x: self.datagram_received(x))
yield onion_service_endpoint.listen(datagram_proxy_factory)
if len(self.onion_unix_socket) == 0:
hs_strings.append("%s %s:%s" % (self.onion_port, self.onion_tcp_interface_ip, self.onion_tcp_port))
else:
hs_strings.append("%s unix:%s" % (self.onion_port, self.onion_unix_socket))
hs = txtorcon.torconfig.EphemeralHiddenService(hs_strings, key_blob_or_type=self.onion_key)
yield hs.add_to_tor(self.tor.protocol)
args = []
directories = []
for (key, value) in self.unsaved.items():
if key == 'HiddenServices':
self.config['HiddenServices'] = value
# using a list here because at least one unit-test
# cares about order -- and conceivably order *could*
# matter here, to Tor...
services = list()
# authenticated services get flattened into the HiddenServices list...
for hs in value:
if IOnionClient.providedBy(hs):
parent = IOnionClient(hs).parent
if parent not in services:
services.append(parent)
elif isinstance(hs, (EphemeralOnionService, EphemeralHiddenService)):
raise ValueError(
"Only filesystem based Onion services may be added"
" via TorConfig.hiddenservices; ephemeral services"
" must be created with 'create_onion_service'."
)
else:
if hs not in services:
services.append(hs)
for hs in services:
for (k, v) in hs.config_attributes():
if k == 'HiddenServiceDir':
if v not in directories:
directories.append(v)
args.append(k)
args.append(v)