Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_progress_updates_error(self, ftb):
config = TorConfig()
ep = TCPHiddenServiceEndpoint(self.reactor, config, 123)
self.assertTrue(IProgressProvider.providedBy(ep))
prog = IProgressProvider(ep)
def boom(*args, **kw):
raise RuntimeError("the bad stuff")
prog.add_progress_listener(boom)
args = (50, "blarg", "Doing that thing we talked about.")
# kind-of cheating, test-wise?
ep._tor_progress_update(*args)
# if we ignore the progress-listener error: success
def test_explicit_data_dir(self, ftb):
with util.TempDir() as tmp:
d = str(tmp)
with open(os.path.join(d, 'hostname'), 'w') as f:
f.write('public')
ep = TCPHiddenServiceEndpoint(self.reactor, self.config, 123, d)
# make sure listen() correctly configures our hidden-serivce
# with the explicit directory we passed in above
yield ep.listen(NoOpProtocolFactory())
self.assertEqual(1, len(self.config.HiddenServices))
self.assertEqual(self.config.HiddenServices[0].dir, d)
self.assertEqual(self.config.HiddenServices[0].hostname, 'public')
def test_illegal_arg_ephemeral_auth(self, ftb):
# XXX I think Tor does actually support this now?
reactor = Mock()
config = Mock()
with self.assertRaises(ValueError) as ctx:
TCPHiddenServiceEndpoint(
reactor, config, 80,
stealth_auth=['alice', 'bob'],
ephemeral=True,
)
self.assertIn(
"onion services don't support stealth_auth",
str(ctx.exception),
)
def test_illegal_arg_ephemeral_hsdir(self, ftb):
reactor = Mock()
config = Mock()
with self.assertRaises(ValueError) as ctx:
TCPHiddenServiceEndpoint(
reactor, config, 80,
ephemeral=True,
hidden_service_dir='/tmp/foo',
)
self.assertIn(
"Specifying 'hidden_service_dir' is incompatible",
str(ctx.exception),
)
def _test_already_bootstrapped(self, ftb):
self.config.bootstrap()
ep = TCPHiddenServiceEndpoint(self.reactor, self.config, 123)
d = ep.listen(NoOpProtocolFactory())
self.protocol.commands[0][1].callback("ServiceID=gobbledegook\nPrivateKey=seekrit")
self.protocol.events['HS_DESC'](
"UPLOAD gobbledegook basic somedirauth REASON=testing"
)
self.protocol.events['HS_DESC'](
"UPLOADED gobbledegook basic somedirauth REASON=testing"
)
return d
def test_illegal_arg_torconfig(self, ftb):
class Foo(object):
pass
config = Foo()
ep = TCPHiddenServiceEndpoint(self.reactor, config, 123)
factory = Mock()
with self.assertRaises(ValueError) as ctx:
yield ep.listen(factory)
self.assertIn(
"Expected a TorConfig instance but",
str(ctx.exception)
)
def getHSEndpoint(data_dir):
if LooseVersion(txtorcon_version) >= LooseVersion('0.10.0'):
return TCPHiddenServiceEndpoint(reactor,
torconfig,
80,
hidden_service_dir=data_dir)
else:
return TCPHiddenServiceEndpoint(reactor,
torconfig,
80,
data_dir=data_dir)
def getHSEndpoint(data_dir):
if LooseVersion(txtorcon_version) >= LooseVersion('0.10.0'):
return TCPHiddenServiceEndpoint(reactor,
torconfig,
80,
hidden_service_dir=data_dir)
else:
return TCPHiddenServiceEndpoint(reactor,
torconfig,
80,
data_dir=data_dir)