Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def start():
if environ.get('DEBUGGING'):
ptvsd.enable_attach(address=(
'127.0.0.1', int(environ.get('DEBUG_PORT'))))
print(ATTACH_DEBUGGER_EVENT)
t = Timer(int(environ.get("debugger_wait_time", 30)), _handle_detached)
t.start()
ptvsd.wait_for_attach()
t.cancel()
logger.debug('Starting grpc server..')
server = grpc.server(ThreadPoolExecutor(max_workers=1))
p = server.add_insecure_port('127.0.0.1:0')
handler = handlers.GrpcServiceHandler(server)
spg.add_RunnerServicer_to_server(handler, server)
logger.info('Listening on port:{}'.format(p))
server.start()
t = threading.Thread(name="listener", target=handler.wait_for_kill_event)
t.start()
t.join()
os._exit(0)
def connect():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
port = int(os.environ['GAUGE_INTERNAL_PORT'])
logger.debug("Connecting to port {}".format(port))
s.connect(('127.0.0.1', port))
return s
def main():
logger.info("Python: {}".format(platform.python_version()))
if sys.argv[1] == "--init":
logger.debug("Initializing gauge project.")
copy_skel_files()
else:
load_implementations()
start()
def load_implementations():
d = get_step_impl_dirs()
logger.debug(
"Loading step implementations from {} dirs.".format(', '.join(d)))
for impl_dir in d:
if not path.exists(impl_dir):
logger.error('can not load implementations from {}. {} does not exist.'.format(
impl_dir, impl_dir))
load_files(d)
def Kill(self, request, context):
logger.debug("KillProcessrequest received")
self.kill_event.set()
return Empty()