Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def main(args=None):
args = parse_args(args)
steps = range(args.start, args.stop, args.step)
if args.scheduler_address:
client = Client(args.scheduler_address)
info = client.scheduler_info()
logger.info("Distributed mode: %s", client.scheduler)
logger.info("Dashboard: %s:%s", info["address"], info["services"]["bokeh"])
else:
logger.warning("Local mode")
logger.info("Fitting for %s", list(steps))
logger.info("Reading data")
X = read().pipe(transform).pipe(as_array)
X, = persist(X)
timings = []
for n_clusters in range(args.start, args.stop, args.step):
logger.info("Starting %02d", n_clusters)
def get_client_cluster():
"""
Separated out the imports to make it easier to mock during testing
"""
from distributed import Client, LocalCluster
return Client, LocalCluster
# Create cluster
cluster = SLURMCluster(
cores=cluster_config["per_worker_cores"],
memory=per_worker_memory,
queue="aics_cpu_general",
walltime="10:00:00",
local_directory=str(log_dir),
log_directory=str(log_dir),
)
# Scale cluster
cluster.scale(cluster_config["workers"])
# Create client connection
client = Client(cluster)
# Wait for a minute for the cluster to fully spin up
time.sleep(60)
# Run benchmark
all_results[cluster_config["name"]] = _run_benchmark_suite(
resources_dir=resources_dir
)
client.shutdown()
cluster.close()
# Wait for a minute for the cluster to fully shutdown
time.sleep(60)
#######################################################################
initialize_ray()
num_cpus = ray.cluster_resources()["CPU"]
elif execution_engine == "Dask": # pragma: no cover
from distributed.client import _get_global_client
import warnings
warnings.warn("The Dask Engine for Modin is experimental.")
if threading.current_thread().name == "MainThread":
# initialize the dask client
client = _get_global_client()
if client is None:
from distributed import Client
client = Client()
num_cpus = sum(client.ncores().values())
elif execution_engine != "Python":
raise ImportError("Unrecognized execution engine: {}.".format(execution_engine))
DEFAULT_NPARTITIONS = max(4, int(num_cpus))
__all__ = [
"DataFrame",
"Series",
"read_csv",
"read_parquet",
"read_json",
"read_html",
"read_clipboard",
"read_excel",
"read_hdf",
def __init__(self, cluster_address=None):
super().__init__(parallelism=0)
if cluster_address is None:
cluster_address = conf.get('dask', 'cluster_address')
if not cluster_address:
raise ValueError('Please provide a Dask cluster address in airflow.cfg')
self.cluster_address = cluster_address
# ssl / tls parameters
self.tls_ca = conf.get('dask', 'tls_ca')
self.tls_key = conf.get('dask', 'tls_key')
self.tls_cert = conf.get('dask', 'tls_cert')
self.client: Optional[Client] = None
self.futures: Optional[Dict[Future, TaskInstanceKeyType]] = None
self.logger.info("Logfile set to %s\n" % options.logfile)
# Build mongo socket
self.mongod_socket = mdb.mongo_helper.MongoSocket(options.mongod_ip, options.mongod_port)
self.logger.info("Mongod Socket Info:")
self.logger.info(str(self.mongod_socket) + "\n")
# Grab the Dask Scheduler
loop = tornado.ioloop.IOLoop.current()
self.local_cluster = None
if options.dask_ip == "":
self.local_cluster = distributed.LocalCluster(nanny=None)
self.dask_socket = distributed.Client(self.local_cluster)
else:
self.dask_socket = distributed.Client(options.dask_ip + ":" + str(options.dask_port))
self.dask_socket.upload_file(compute_file)
self.logger.info("Dask Scheduler Info:")
self.logger.info(str(self.dask_socket) + "\n")
# Make sure the scratch is there
if not os.path.exists(dask_working_dir):
os.makedirs(dask_working_dir)
# Dask Nanny
self.dask_nanny = DaskNanny(self.dask_socket, self.mongod_socket, logger=self.logger)
# Start up the app
app = tornado.web.Application(
[
def _distributed_client(n=None):
"""Return a dask.distributed client, but cache it.
"""
import distributed
cluster = distributed.LocalCluster(n, scheduler_port=0)
client = distributed.Client(cluster)
return client
def check_dask(self):
try:
import distributed
client = distributed.Client(self.nodes[0]+":8686")
print "Found %d brokers: %s" % (len(brokers.keys()), str(brokers))
return client.scheduler_info()
except:
pass
return None
def _distributed_client(n=None):
"""Return a dask.distributed client, but cache it.
"""
import distributed
cluster = distributed.LocalCluster(n, scheduler_port=0)
client = distributed.Client(cluster)
return client