Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_quit_no_protocol(self):
tor = Tor(Mock(), Mock())
tor._protocol = None
tor._process_protocol = None
with self.assertRaises(RuntimeError) as ctx:
yield tor.quit()
self.assertTrue('no protocol instance' in str(ctx.exception))
def test_create_state(self):
tor = Tor(Mock(), Mock())
with patch('txtorcon.controller.TorState') as ts:
ts.post_boostrap = defer.succeed('boom')
yield tor.create_state()
# no assertions; we just testing this doesn't raise
def setUp(self):
reactor = Mock()
proto = Mock()
directlyProvides(proto, ITorControlProtocol)
self.cfg = Mock()
self.tor = Tor(reactor, proto, _tor_config=self.cfg)
def test_web_agent_defaults(self):
reactor = Mock()
# XXX is there a faster way to do this? better reactor fake?
fake_host = Mock()
fake_host.port = 1234
fake_port = Mock()
fake_port.getHost = Mock(return_value=fake_host)
reactor.listenTCP = Mock(return_value=fake_port)
cfg = Mock()
cfg.create_socks_endpoint = Mock(return_value=defer.succeed("9050"))
proto = Mock()
proto.get_conf = Mock(return_value=defer.succeed({}))
directlyProvides(proto, ITorControlProtocol)
tor = Tor(reactor, proto, _tor_config=cfg)
try:
agent = tor.web_agent(pool=self.pool)
except ImportError as e:
if 'IAgentEndpointFactory' in str(e):
print("Skipping; appears we don't have web support")
return
resp = yield agent.request(b'GET', b'meejah.ca')
self.assertEqual(self.expected_response, resp)
def test_web_agent_error(self):
reactor = Mock()
cfg = Mock()
proto = Mock()
directlyProvides(proto, ITorControlProtocol)
tor = Tor(reactor, proto, _tor_config=cfg)
with self.assertRaises(ValueError) as ctx:
agent = tor.web_agent(pool=self.pool, socks_endpoint=object())
yield agent.request(B'GET', b'meejah.ca')
self.assertTrue("'socks_endpoint' should be" in str(ctx.exception))
def setUp(self):
reactor = Mock()
proto = Mock()
directlyProvides(proto, ITorControlProtocol)
self.tor = Tor(reactor, proto)
def setUp(self):
reactor = Mock()
proto = Mock()
proto.get_conf = Mock(return_value=defer.succeed({"SocksPort": "9050"}))
self.cfg = Mock()
self.tor = Tor(reactor, proto, _tor_config=self.cfg)
config = TorConfig()
def boot(arg=None):
config.post_bootstrap.callback(config)
config.__dict__['bootstrap'] = Mock(side_effect=boot)
config.__dict__['attach_protocol'] = Mock(return_value=defer.succeed(None))
def foo(*args, **kw):
rtn = Mock()
rtn.post_bootstrap = defer.succeed(None)
rtn.when_connected = Mock(return_value=defer.succeed(rtn))
return rtn
tpp.side_effect = foo
tor = yield launch(reactor, _tor_config=config)
self.assertTrue(isinstance(tor, Tor))
def test_dns_resolve_ptr(self, fake_socks):
answer = object()
proto = Mock()
proto.get_conf = Mock(return_value=defer.succeed({"SocksPort": "9050"}))
tor = Tor(Mock(), proto)
fake_socks.resolve_ptr = Mock(return_value=defer.succeed(answer))
ans = yield tor.dns_resolve_ptr("4.3.2.1")
self.assertEqual(ans, answer)
def try_endpoint(control_ep):
assert IStreamClientEndpoint.providedBy(control_ep)
proto = yield control_ep.connect(
TorProtocolFactory(
password_function=password_function
)
)
config = yield TorConfig.from_protocol(proto)
tor = Tor(reactor, proto, _tor_config=config)
returnValue(tor)