Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@click.option(
'--iface', default=None, help='Network interface to listen on')
def start_server(port, iface):
euid = os.geteuid()
if euid != 0:
print "Error: tissue server must be run as root"
sys.exit(1)
SniffProtocol._iface = iface
f = SockJSMultiFactory()
f.addFactory(Factory.forProtocol(SniffProtocol), 'sniff')
try:
reactor.listenTCP(port, f)
reactor.run()
except OverflowError:
print "Error: Port must be in range 0-65535"
print self.__str__()
sys.exit(0)
if __name__ == '__main__':
options = Options()
try:
options.parseOptions()
except usage.UsageError, errortext:
print '%s: %s' % (sys.argv[0], errortext)
print '%s: Try --help for usage details.' % (sys.argv[0])
sys.exit(0)
i = Inspector(logfile=options['logfile'])
reactor.run()
print('Adding kill to', name)
def add_death(self, name):
print('Adding death to', name)
def check_user(self, name, password):
print('Checking user name/pass ({}, {})'.format(name, password))
# TODO: pylint thinks this function should have an argument
# check it out
return succeed() # pylint: disable=no-value-for-parameter
class TestFactory(StatsFactory):
protocol = TestServer
reactor.listenTCP(DEFAULT_PORT, TestFactory('marmelade'))
reactor.run()
print '15-10=%d' % (diff)
log = yield client.getStruct(1)
print 'Check log: %s' % (log.value)
reactor.stop()
if __name__ == '__main__':
d = ClientCreator(reactor,
TTwisted.ThriftClientProtocol,
Calculator.Client,
TBinaryProtocol.TBinaryProtocolFactory(),
).connectTCP("127.0.0.1", 9090)
d.addCallback(lambda conn: conn.client)
d.addCallback(main)
reactor.run()
d.addCallback(printvalue, 'ProgrammingAnalogVariable1')
d = iface.get_point_async('Cool1')
d.addCallback(printvalue, 'Cool1')
d = iface.scrape_all()
d.addCallback(printvalue, 'all')
def end():
reactor.stop()
reactor.callLater(2, end)
reactor.callLater(0, run_tests)
reactor.run()
# Calculate and append the authentication hash for the target network... if necessary
if NETWORK[_target]['LOCAL']['AUTH_ENABLED']:
_tmp_data = self.hashed_packet(NETWORK[_target]['LOCAL']['AUTH_KEY'], _tmp_data)
# Send the packet to all peers in the target IPSC
send_to_ipsc(_target, _tmp_data)
if __name__ == '__main__':
logger.info('DMRlink \'bridge.py\' (c) 2013, 2014 N0MJS & the K0USY Group - SYSTEM STARTING...')
for ipsc_network in NETWORK:
if NETWORK[ipsc_network]['LOCAL']['ENABLED']:
networks[ipsc_network] = bridgeIPSC(ipsc_network)
reactor.listenUDP(NETWORK[ipsc_network]['LOCAL']['PORT'], networks[ipsc_network])
reactor.run()
def runReactor(self):
from twisted.internet import reactor
reactor.callWhenRunning(self.whenRunning)
self.log.info("Starting reactor...")
reactor.run()
def main(argv):
parser = ProfileCLI.make_parser()
parser.add_argument("host")
parser.add_argument("port", nargs='?', default=25565, type=int)
args = parser.parse_args(argv)
run(args)
reactor.run()
output = sys.stdout
if options.show_duration:
output = TimestampOutput(output)
results = ConsoleOutput(output=output,
show_tracebacks=options.show_tracebacks,
show_duration=options.show_duration,
verbose=options.verbose)
results = FailureCountingResultWrapper(results)
with open(options.config_file) as f:
descriptions = yaml.load(f)
checks = build_checks(descriptions)
if not options.validate:
reactor.callWhenRunning(run_checks, checks, pattern, results)
reactor.run()
if results.any_failed():
return 1
else:
return 0
def run(self):
reactor.run()