Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
funcx.set_stream_logger(level=logging.DEBUG if args.debug else logging.INFO)
global logger
logger = logging.getLogger('funcx')
if args.version:
logger.info("FuncX version: {}".format(funcx.__version__))
logger.debug("Command: {}".format(args.command))
args.config_file = os.path.join(args.config_dir, 'config.py')
if args.command == "init":
if args.force:
logger.debug("Forcing re-authentication via GlobusAuth")
funcx_client = FuncXClient(force_login=args.force)
init_endpoint(args)
return
if not os.path.exists(args.config_file):
logger.critical("Missing a config file at {}. Critical error. Exiting.".format(args.config_file))
logger.info("Please run the following to create the appropriate config files : \n $> funcx-endpoint init")
exit(-1)
if args.command == "init":
init_endpoint(args)
exit(-1)
logger.debug("Loading config files from {}".format(args.config_dir))
import importlib.machinery
global_config = importlib.machinery.SourceFileLoader('global_config',
self.results_outgoing = self.context.socket(zmq.DEALER)
self.results_outgoing.set_hwm(0)
logger.info("Results outgoing on tcp://{}:{}".format(client_address, client_ports[1]))
self.results_outgoing.connect("tcp://{}:{}".format(client_address, client_ports[1]))
self.command_channel = self.context.socket(zmq.DEALER)
self.command_channel.RCVTIMEO = 1000 # in milliseconds
# self.command_channel.set_hwm(0)
logger.info("Command channel on tcp://{}:{}".format(client_address, client_ports[2]))
self.command_channel.connect("tcp://{}:{}".format(client_address, client_ports[2]))
logger.info("Connected to client")
self.pending_task_queue = {}
self.containers = {}
self.total_pending_task_count = 0
self.fxs = FuncXClient()
logger.info("Interchange address is {}".format(self.interchange_address))
self.worker_ports = worker_ports
self.worker_port_range = worker_port_range
self.task_outgoing = self.context.socket(zmq.ROUTER)
self.task_outgoing.set_hwm(0)
self.results_incoming = self.context.socket(zmq.ROUTER)
self.results_incoming.set_hwm(0)
# initalize the last heartbeat time to start the loop
self.last_heartbeat = time.time()
self.max_heartbeats_missed = max_heartbeats_missed
self.endpoint_id = endpoint_id
if self.worker_ports:
| v |
+-----+-----+-----+ v
| Start |---5---> Interchange
| Endpoint | daemon
+-----------------+
Parameters
----------
args : args object
Args object from the arg parsing
global_config : dict
Global config dict
"""
funcx_client = FuncXClient()
endpoint_dir = os.path.join(args.config_dir, args.name)
endpoint_json = os.path.join(endpoint_dir, 'endpoint.json')
if not os.path.exists(endpoint_dir):
print('''Endpoint {0} is not configured!
1. Please create a configuration template with:
$ funcx-endpoint configure {0}
2. Update configuration
3. Start the endpoint.
'''.format(args.name))
return
# If pervious registration info exists, use that
if os.path.exists(endpoint_json):
with open(endpoint_json, 'r') as fp:
self.results_outgoing = self.context.socket(zmq.DEALER)
self.results_outgoing.set_hwm(0)
logger.info("Results outgoing on tcp://{}:{}".format(client_address, client_ports[1]))
self.results_outgoing.connect("tcp://{}:{}".format(client_address, client_ports[1]))
self.command_channel = self.context.socket(zmq.DEALER)
self.command_channel.RCVTIMEO = 1000 # in milliseconds
# self.command_channel.set_hwm(0)
logger.info("Command channel on tcp://{}:{}".format(client_address, client_ports[2]))
self.command_channel.connect("tcp://{}:{}".format(client_address, client_ports[2]))
logger.info("Connected to client")
self.pending_task_queue = {}
self.containers = {}
self.total_pending_task_count = 0
self.fxs = FuncXClient()
logger.info("Interchange address is {}".format(self.interchange_address))
self.worker_ports = worker_ports
self.worker_port_range = worker_port_range
self.task_outgoing = self.context.socket(zmq.ROUTER)
self.task_outgoing.set_hwm(0)
self.results_incoming = self.context.socket(zmq.ROUTER)
self.results_incoming.set_hwm(0)
# initalize the last heartbeat time to start the loop
self.last_heartbeat = time.time()
self.max_heartbeats_missed = max_heartbeats_missed
self.endpoint_id = endpoint_id
if self.worker_ports:
app_name="FuncX SDK",
token_storage=JSONTokenStorage(tokens_filename))
fx_scope = "https://auth.globus.org/scopes/facd7ccc-c5f4-42aa-916b-a0e270e2c2a9/all"
if not fx_authorizer:
self.native_client.login(requested_scopes=[fx_scope],
no_local_server=kwargs.get("no_local_server", True),
no_browser=kwargs.get("no_browser", True),
refresh_tokens=kwargs.get("refresh_tokens", True),
force=force_login)
all_authorizers = self.native_client.get_authorizers_by_scope(requested_scopes=[fx_scope])
fx_authorizer = all_authorizers[fx_scope]
super(FuncXClient, self).__init__("funcX",
environment='funcx',
authorizer=fx_authorizer,
http_timeout=http_timeout,
base_url=funcx_service_address,
**kwargs)
self.fx_serializer = FuncXSerializer()
def __init__(self, fxc=None, batch_status=True, local=False,
last_n=3, log_level='DEBUG', *args, **kwargs):
self._fxc = fxc or FuncXClient(*args, **kwargs)
# Special Dill serialization so that wrapped methods work correctly
self._fxc.fx_serializer.use_custom('03\n', 'code')
# Track all pending tasks (organized by endpoint) and results
self._pending = {}
self._results = {}
self._completed_tasks = set()
self._use_batch_status = batch_status
# Average times for tasks
self._last_n = last_n
self._exec_times = defaultdict(lambda: defaultdict(Queue))
self._avg_exec_time = defaultdict(lambda: defaultdict(float))
self._num_executions = defaultdict(lambda: defaultdict(int))
# Set logging levels
scopes = [fx_scope, search_scope]
search_authorizer = None
if not fx_authorizer:
self.native_client.login(requested_scopes=scopes,
no_local_server=kwargs.get("no_local_server", True),
no_browser=kwargs.get("no_browser", True),
refresh_tokens=kwargs.get("refresh_tokens", True),
force=force_login)
all_authorizers = self.native_client.get_authorizers_by_scope(requested_scopes=scopes)
fx_authorizer = all_authorizers[fx_scope]
search_authorizer = all_authorizers[search_scope]
super(FuncXClient, self).__init__("funcX",
environment='funcx',
authorizer=fx_authorizer,
http_timeout=http_timeout,
base_url=funcx_service_address,
**kwargs)
self.fx_serializer = FuncXSerializer()
self.searcher = SearchHelper(authorizer=search_authorizer)