How to use the txtorcon.testutil.FakeControlProtocol function in txtorcon

To help you get started, we’ve selected a few txtorcon examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github meejah / txtorcon / test / test_onion.py View on Github external
def test_ephemeral_remove_not_ok(self):
        protocol = FakeControlProtocol([])
        config = TorConfig(protocol)

        eph_d = EphemeralOnionService.create(
            Mock(),
            config,
            ports=["80 127.0.0.1:80"],
        )

        cmd, d = protocol.commands[0]
        self.assertEqual(u"ADD_ONION NEW:BEST Port=80,127.0.0.1:80", cmd)

        d.callback("PrivateKey={}\nServiceID={}".format(_test_private_key_blob, _test_onion_id))
        cb = protocol.events['HS_DESC']

        for x in range(6):
            cb('UPLOAD {} UNKNOWN hsdir_{}'.format(_test_onion_id, x))
github meejah / txtorcon / test / test_onion.py View on Github external
def setUp(self):
        self.thedir = self.mktemp()
        os.mkdir(self.thedir)
        protocol = FakeControlProtocol([])
        self.config = TorConfig(protocol)
        self.hs = FilesystemAuthenticatedOnionService(
            config=self.config,
            thedir=self.thedir,
            ports=["80 127.0.0.1:1234"],
            auth=AuthBasic(['foo', 'bar'])
        )
github meejah / txtorcon / test / test_onion.py View on Github external
def test_old_tor_version(self):
        protocol = FakeControlProtocol([])
        protocol.version = "0.1.2.3"
        config = TorConfig(protocol)
        hsdir = self.mktemp()

        def my_progress(a, b, c):
            pass

        eph_d = FilesystemOnionService.create(
            Mock(),
            config,
            hsdir,
            ports=["80 127.0.0.1:80"],
            progress=my_progress,
        )
        yield eph_d
github meejah / txtorcon / test / test_onion.py View on Github external
def test_ephemeral_v3_ip_addr_tuple(self):
        protocol = FakeControlProtocol([])
        config = TorConfig(protocol)

        # returns a Deferred we're ignoring
        EphemeralOnionService.create(
            Mock(),
            config,
            ports=[(80, "192.168.1.2:80")],
            detach=True,
            version=3,
        )

        cmd, d = protocol.commands[0]
        self.assertEqual(u"ADD_ONION NEW:ED25519-V3 Port=80,192.168.1.2:80 Flags=Detach", cmd)
        d.callback("PrivateKey={}\nServiceID={}".format(_test_private_key_blob, _test_onion_id))
github meejah / txtorcon / test / test_controller.py View on Github external
def test_dns_resolve_default_socksport(self, fake_socks):
        answer = object()
        cfg = Mock()
        from txtorcon.testutil import FakeControlProtocol
        proto = FakeControlProtocol([
            {"SocksPort": "DEFAULT"},
            "9050",
        ])
        proto.answers
        tor = Tor(Mock(), proto, _tor_config=cfg)
        fake_socks.resolve = Mock(return_value=defer.succeed(answer))
        ans = yield tor.dns_resolve("meejah.ca")
        self.assertEqual(ans, answer)
github meejah / txtorcon / test / test_onion.py View on Github external
def test_ephemeral_ports_bad0(self):
        protocol = FakeControlProtocol([])
        config = TorConfig(protocol)

        with self.assertRaises(ValueError) as ctx:
            yield EphemeralAuthenticatedOnionService.create(
                Mock(),
                config,
                ports="80 127.0.0.1:80",
                auth=AuthBasic(["xavier"]),
            )
        self.assertIn(
            "'ports' must be a list of strings",
            str(ctx.exception),
        )
github meejah / txtorcon / test / test_onion.py View on Github external
def test_ephemeral_auth_basic_remove_fails(self):
        protocol = FakeControlProtocol([])
        config = TorConfig(protocol)

        eph_d = EphemeralAuthenticatedOnionService.create(
            Mock(),
            config,
            ports=["80 127.0.0.1:80"],
            auth=AuthBasic([
                "steve",
                ("carol", "c4r0ls33kr1t"),
            ]),
        )
        cmd, d = protocol.commands[0]
        self.assertTrue(
            cmd.startswith(
                u"ADD_ONION NEW:BEST Port=80,127.0.0.1:80 Flags=BasicAuth "
            )
github meejah / txtorcon / test / test_onion.py View on Github external
def test_prop224_private_key(self):
        protocol = FakeControlProtocol([])
        config = TorConfig(protocol)
        hsdir = self.mktemp()
        os.mkdir(hsdir)
        with open(join(hsdir, 'hs_ed25519_secret_key'), 'wb') as f:
            f.write(b'\x01\x02\x03\x04')
        with open(join(hsdir, 'hostname'), 'w') as f:
            f.write(u'{}.onion'.format(_test_onion_id))

        hs_d = FilesystemOnionService.create(
            Mock(),
            config,
            hsdir=hsdir,
            ports=["80 127.0.0.1:4321"],
            version=3,
        )
github meejah / txtorcon / test / test_onion.py View on Github external
def test_ephemeral_extra_kwargs(self):
        protocol = FakeControlProtocol([])
        config = TorConfig(protocol)

        with self.assertRaises(ValueError) as ctx:
            EphemeralOnionService(
                config,
                ports=["80 127.0.0.1:80"],
                ver=2,
                something_funny="foo",
            )
        self.assertIn(
            "Unknown kwarg",
            str(ctx.exception),
        )
github meejah / txtorcon / test / test_onion.py View on Github external
def test_dir_ioerror_v3(self):
        protocol = FakeControlProtocol([])
        config = TorConfig(protocol)
        hsdir = self.mktemp()
        os.mkdir(hsdir)
        with open(join(hsdir, "hostname"), "w") as f:
            f.write('{}.onion'.format(_test_onion_id))

        hs_d = FilesystemOnionService.create(
            Mock(),
            config,
            hsdir=hsdir,
            ports=["80 127.0.0.1:4321"],
            version=3,
        )

        # arrange HS_DESC callbacks so we get the hs instance back
        cb = protocol.events['HS_DESC']