Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_ephemeral_remove_not_ok(self):
protocol = FakeControlProtocol([])
config = TorConfig(protocol)
eph_d = EphemeralOnionService.create(
Mock(),
config,
ports=["80 127.0.0.1:80"],
)
cmd, d = protocol.commands[0]
self.assertEqual(u"ADD_ONION NEW:BEST Port=80,127.0.0.1:80", cmd)
d.callback("PrivateKey={}\nServiceID={}".format(_test_private_key_blob, _test_onion_id))
cb = protocol.events['HS_DESC']
for x in range(6):
cb('UPLOAD {} UNKNOWN hsdir_{}'.format(_test_onion_id, x))
def setUp(self):
self.thedir = self.mktemp()
os.mkdir(self.thedir)
protocol = FakeControlProtocol([])
self.config = TorConfig(protocol)
self.hs = FilesystemAuthenticatedOnionService(
config=self.config,
thedir=self.thedir,
ports=["80 127.0.0.1:1234"],
auth=AuthBasic(['foo', 'bar'])
)
def test_old_tor_version(self):
protocol = FakeControlProtocol([])
protocol.version = "0.1.2.3"
config = TorConfig(protocol)
hsdir = self.mktemp()
def my_progress(a, b, c):
pass
eph_d = FilesystemOnionService.create(
Mock(),
config,
hsdir,
ports=["80 127.0.0.1:80"],
progress=my_progress,
)
yield eph_d
def test_ephemeral_v3_ip_addr_tuple(self):
protocol = FakeControlProtocol([])
config = TorConfig(protocol)
# returns a Deferred we're ignoring
EphemeralOnionService.create(
Mock(),
config,
ports=[(80, "192.168.1.2:80")],
detach=True,
version=3,
)
cmd, d = protocol.commands[0]
self.assertEqual(u"ADD_ONION NEW:ED25519-V3 Port=80,192.168.1.2:80 Flags=Detach", cmd)
d.callback("PrivateKey={}\nServiceID={}".format(_test_private_key_blob, _test_onion_id))
def test_dns_resolve_default_socksport(self, fake_socks):
answer = object()
cfg = Mock()
from txtorcon.testutil import FakeControlProtocol
proto = FakeControlProtocol([
{"SocksPort": "DEFAULT"},
"9050",
])
proto.answers
tor = Tor(Mock(), proto, _tor_config=cfg)
fake_socks.resolve = Mock(return_value=defer.succeed(answer))
ans = yield tor.dns_resolve("meejah.ca")
self.assertEqual(ans, answer)
def test_ephemeral_ports_bad0(self):
protocol = FakeControlProtocol([])
config = TorConfig(protocol)
with self.assertRaises(ValueError) as ctx:
yield EphemeralAuthenticatedOnionService.create(
Mock(),
config,
ports="80 127.0.0.1:80",
auth=AuthBasic(["xavier"]),
)
self.assertIn(
"'ports' must be a list of strings",
str(ctx.exception),
)
def test_ephemeral_auth_basic_remove_fails(self):
protocol = FakeControlProtocol([])
config = TorConfig(protocol)
eph_d = EphemeralAuthenticatedOnionService.create(
Mock(),
config,
ports=["80 127.0.0.1:80"],
auth=AuthBasic([
"steve",
("carol", "c4r0ls33kr1t"),
]),
)
cmd, d = protocol.commands[0]
self.assertTrue(
cmd.startswith(
u"ADD_ONION NEW:BEST Port=80,127.0.0.1:80 Flags=BasicAuth "
)
def test_prop224_private_key(self):
protocol = FakeControlProtocol([])
config = TorConfig(protocol)
hsdir = self.mktemp()
os.mkdir(hsdir)
with open(join(hsdir, 'hs_ed25519_secret_key'), 'wb') as f:
f.write(b'\x01\x02\x03\x04')
with open(join(hsdir, 'hostname'), 'w') as f:
f.write(u'{}.onion'.format(_test_onion_id))
hs_d = FilesystemOnionService.create(
Mock(),
config,
hsdir=hsdir,
ports=["80 127.0.0.1:4321"],
version=3,
)
def test_ephemeral_extra_kwargs(self):
protocol = FakeControlProtocol([])
config = TorConfig(protocol)
with self.assertRaises(ValueError) as ctx:
EphemeralOnionService(
config,
ports=["80 127.0.0.1:80"],
ver=2,
something_funny="foo",
)
self.assertIn(
"Unknown kwarg",
str(ctx.exception),
)
def test_dir_ioerror_v3(self):
protocol = FakeControlProtocol([])
config = TorConfig(protocol)
hsdir = self.mktemp()
os.mkdir(hsdir)
with open(join(hsdir, "hostname"), "w") as f:
f.write('{}.onion'.format(_test_onion_id))
hs_d = FilesystemOnionService.create(
Mock(),
config,
hsdir=hsdir,
ports=["80 127.0.0.1:4321"],
version=3,
)
# arrange HS_DESC callbacks so we get the hs instance back
cb = protocol.events['HS_DESC']