Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
logging.warn("ip found as 127.0.0.1, remote connections won't work.")
dealer = yield from aiozmq.create_zmq_stream(
zmq.DEALER,
bind='tcp://{}:{}'.format(ip, port))
addr = list(dealer.transport.bindings())[0]
logging.info("Server started on {}.".format(addr))
path = os.path.abspath(path)
logging.info("Watching files at {}".format(path))
@asyncio.coroutine
def _callback(fname):
dealer.write((fname.encode(),))
event_loop = asyncio.get_event_loop()
event_loop.create_task(watch_path(path, _callback, recursive=True))
file_counter = 0
call_counter = 0
output_handle = None
calls_per_file = 100000
fail_folder = os.path.join(outpath, '_FAILED')
if os.path.exists(fail_folder):
os.rmdir(fail_folder)
os.mkdir(fail_folder)
pass_folder = os.path.join(outpath, '_PASSED')
if os.path.exists(pass_folder):
os.rmdir(pass_folder)
os.mkdir(pass_folder)
if output is not None: