Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_progress_updates(self, ftb):
config = TorConfig()
ep = TCPHiddenServiceEndpoint(self.reactor, config, 123)
self.assertTrue(IProgressProvider.providedBy(ep))
prog = IProgressProvider(ep)
ding = Mock()
prog.add_progress_listener(ding)
args = (50, "blarg", "Doing that thing we talked about.")
# kind-of cheating, test-wise?
ep._tor_progress_update(*args)
self.assertTrue(ding.called_with(*args))
tor = yield txtorcon.launch(reactor, progress_updates=on_progress)
# an endpoint that'll listen on port 80, and put the hostname +
# private_key files in './hidden_service_dir'
# NOTE: you should put these somewhere you've thought about more
# and made proper permissions for the parent directory, etc. A
# good choice for a system-wide Tor is /var/lib/tor/
hs_dir = './hidden_service_dir'
print("Creating hidden-service, keys in: {}".format(hs_dir))
ep = tor.create_onion_disk_endpoint(80, hs_dir=hs_dir, group_readable=True)
print("Note: descriptor upload can take several minutes")
txtorcon.IProgressProvider(ep).add_progress_listener(on_progress)
port = yield ep.listen(server.Site(res))
print("Private key:\n{}".format(port.getHost().onion_key))
print("Site listening: {}".format(port.getHost()))
yield defer.Deferred() # wait forever
def main(reactor):
root = resource.Resource()
root.putChild('', static.Data(
"Hello, hidden-service world!",
'text/html')
)
ep = endpoints.serverFromString(reactor, "onion:80")
txtorcon.IProgressProvider(ep).add_progress_listener(
lambda percent, tag, msg: print(msg)
)
port = yield ep.listen(server.Site(root))
print("Our address {}".format(port))
yield defer.Deferred() # wait forever; this Deferred never fires
react(main)
tor = yield txtorcon.connect(
reactor,
endpoints.TCP4ClientEndpoint(reactor, "localhost", 9351),
)
if False:
ep = tor.create_onion_endpoint(
80,
version=3,
single_hop=True,
)
else:
ep = endpoints.serverFromString(reactor, "onion:80:version=3:singleHop=true")
def on_progress(percent, tag, msg):
print('%03d: %s' % (percent, msg))
txtorcon.IProgressProvider(ep).add_progress_listener(on_progress)
port = yield ep.listen(server.Site(Simple()))
print("Private key:\n{}".format(port.getHost().onion_key))
hs = port.onion_service
print("Site on http://{}".format(hs.hostname))
yield defer.Deferred() # wait forever
def main(reactor):
tor = yield txtorcon.connect(
reactor,
endpoints.TCP4ClientEndpoint(reactor, "localhost", 9251),
)
ep = tor.create_authenticated_onion_endpoint(
80,
auth=AuthBasic([
("alice", "0GaFhnbunp0TxZuBhejhxg"),
"bob",
]),
)
def on_progress(percent, tag, msg):
print('%03d: %s' % (percent, msg))
txtorcon.IProgressProvider(ep).add_progress_listener(on_progress)
print("Note: descriptor upload can take several minutes")
port = yield ep.listen(server.Site(Simple()))
print("Private key:\n{}".format(port.getHost().onion_key))
hs = port.onion_service
print("Clients:")
for name in hs.client_names():
print(" {}: username={} token={}".format(
hs.get_client(name).hostname,
name,
hs.get_client(name).auth_token),
)
print("hs {}".format(hs))
print(type(hs))
print(dir(hs))
yield defer.Deferred() # wait forever
def progress(percent, tag, message):
bar = int(percent / 10)
log.debug('[%s%s] %s' % ('#' * bar, '.' * (10 - bar), message))
def setup_complete(port):
port = txtorcon.IHiddenService(port)
self.uri = "http://%s" % (port.getHost().onion_uri)
log.info('I have set up a hidden service, advertised at: %s'
% self.uri)
log.info('locally listening on %s' % port.local_address.getHost())
def setup_failed(args):
log.error('onion service setup FAILED: %r' % args)
endpoint = endpoints.serverFromString(reactor, 'onion:80')
txtorcon.IProgressProvider(endpoint).add_progress_listener(progress)
d = endpoint.listen(factory)
d.addCallback(setup_complete)
d.addErrback(setup_failed)
return d
def main(reactor):
tor = yield txtorcon.connect(
reactor,
endpoints.TCP4ClientEndpoint(reactor, "localhost", 9251),
)
print(default_control_port())
ep = tor.create_filesystem_onion_endpoint(80, "./test_prop224_service", version=3)
def on_progress(percent, tag, msg):
print('%03d: %s' % (percent, msg))
txtorcon.IProgressProvider(ep).add_progress_listener(on_progress)
print("Note: descriptor upload can take several minutes")
port = yield ep.listen(server.Site(Simple()))
print("Site listening: {}".format(port.getHost()))
print("Private key:\n{}".format(port.getHost().onion_key))
yield defer.Deferred() # wait forever
# to re-create a previous hidden-service, pass the private key
# blob you retrieved earler via port.getHost().onion_key like so:
# ep = endpoints.serverFromString(reactor, "onion:80:privateKey=")
# Beware that you have to escape the ":" after RSA1024 because of
# reasons (the endpoint syntax uses them). For this reason, you
# can omit the "RSA1024:" part if you wish.
# ep = endpoints.serverFromString(reactor, "onion:80")
# this one should be "nyfnesplt3f5nvn3.onion"
ep = endpoints.serverFromString(reactor, "onion:80:controlPort={port}:privateKey=RSA1024\\:MIICWwIBAAKBgQDml0L1Btxe1QIs88mvKvcgAEd19bUorzMndfXXBbPt2y1lTjm+vGldJRCXb/RArfCb9F2q7IWL4ScuJBiUCqpKVG2aGK8yOxw4c5WKvnLW8MRf5+jAPlR3h7idBdrVGCY/9gXf9JzWfpIhMfFidM4Xq6VpzMvignss6FB6i9zhOwIDAQABAoGAXuWjVamUKabp9UwDFYbOGypiPmZ3Pp4TpErEeNBNAzdvUEDIPPnXNtEZKemWEMREwDnqDny2XSG0+SU7xDk7aQGTFxipo+NAl18QMW2XcBjWrIG5P0L9E+j58k5Nq6EEaMQ8G8X3hsnX7EwRqnJYOwUWUQ4emi6TvNScSMS251kCQQD6KJXltkSfwU3d5hOh37x3pOp4ZcpI6eKwwfgqP+1pVfOwjvXfLqLgLRf9+NtmG+cU5HRDwmf9rbJNCOE++11HAkEA6/mz/L+54NRk9tPN4vYfn969v7fz9CQndQUsTTrtArqtjg7baKts3ndagj+/itJfY6qV/OonN9XdntQXTWWGbQJAY244TmrJEfqieZ2WlhO49JFPRPWolpyoJvuiKSDpu6GXT8ky/zepM5OY4rDEe+yBR/OaJsihztn4cdgit4bvxwJAFsnZiOoXEFBSo8eWlXmBWlYPawlfxM8NBG8IdTjglKfkhNiIddZAQEe0dOmlHMnuLljV/UO7n9fGfEUtLutEDQJAC3c9gSe2of41TaZhQ+aHzQ8E9cs7fg3gXXUgWlocQK6fYq+tC0CF7dDmydShF8vI8oEcgJGhgtUAXQDwH9eD0A==".format(port=default_control_port()))
def on_progress(percent, tag, msg):
print('%03d: %s' % (percent, msg))
txtorcon.IProgressProvider(ep).add_progress_listener(on_progress)
print("Note: descriptor upload can take several minutes")
port = yield ep.listen(server.Site(Simple()))
print("Site listening: {}".format(port.getHost()))
print("Private key:\n{}".format(port.getHost().onion_key))
yield defer.Deferred() # wait forever