Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_run_dask(fix_task_env):
import numpy as np
from dask import delayed as dl
from dask.distributed import Client
dc = Client(processes=False)
input_task_example, gathered_task_example, post_processing_task_example = (
fix_task_env
)
parts = {"a": [0.0, 1.0, 2.0], "b": [-3.0, 10.0, 2.0], "c": [20.0]}
numpoints = 20
prefactor = 0.1
input_delayed = dl(input_task_example)(parts)
gathered_delayed = dl(gathered_task_example, nout=1)([input_delayed], [numpoints])[
0
]
post_proc_delayed = dl(post_processing_task_example)(
input_delayed, gathered_delayed, prefactor
)
input_future = dc.compute(input_delayed)
def fit(self, data, args):
params = self.configure(data, args)
n_workers = None if args.gpus < 0 else args.gpus
cluster = LocalCUDACluster(n_workers=n_workers,
local_directory=args.root)
client = Client(cluster)
n_partitions = len(client.scheduler_info()['workers'])
X_sliced, y_sliced = self.get_slices(n_partitions,
data.X_train, data.y_train)
X = da.concatenate([da.from_array(sub_array) for sub_array in X_sliced])
X = X.rechunk((X_sliced[0].shape[0], data.X_train.shape[1]))
y = da.concatenate([da.from_array(sub_array) for sub_array in y_sliced])
y = y.rechunk(X.chunksize[0])
dtrain = xgb.dask.DaskDMatrix(client, X, y)
with Timer() as t:
output = xgb.dask.train(client, params, dtrain, num_boost_round=args.ntrees)
self.model = output['booster']
client.close()
cluster.close()
return t.interval
async def write(task_id, outputs):
async with await Client(asynchronous=True, processes=False) as client:
outputs = cs_storage.deserialize_from_json(outputs)
res = await client.submit(cs_storage.write, task_id, outputs)
return res
"--output=/home/b.weinstein/logs/dask-worker-%j.out"
]
cluster = SLURMCluster(
processes=1,
queue='hpg2-compute',
cores=1,
memory='13GB',
walltime='24:00:00',
job_extra=extra_args,
local_directory="/home/b.weinstein/logs/", death_timeout=300)
print(cluster.job_script())
cluster.adapt(minimum=num_workers, maximum=num_workers)
dask_client = Client(cluster)
#Start dask
dask_client.run_on_scheduler(start_tunnel)
for site in data_paths:
futures = dask_client.map(Generate.run, data_paths[site], site=site, DeepForest_config=DeepForest_config)
wait(futures)
print("{} complete".format(site))
print("All sites complete")
tfidf.fit(train_words_binary_matrix)
train_words_tfidf_matrix = tfidf.transform(train_words_binary_matrix)
save_matrix(df_train, train_words_tfidf_matrix, train_output)
del df_train
df_test = get_df(test_input)
test_words = np.array(df_test.text.str.lower().values.astype('U'))
test_words_binary_matrix = bag_of_words.transform(test_words)
test_words_tfidf_matrix = tfidf.transform(test_words_binary_matrix)
save_matrix(df_test, test_words_tfidf_matrix, test_output)
if __name__ == '__main__':
client = dask.distributed.Client('localhost:8786')
np.set_printoptions(suppress=True)
INPUT_TRAIN_TSV_PATH = conf.data_dir/'split_train_test'/'Posts-train.tsv'
INPUT_TEST_TSV_PATH = conf.data_dir/'split_train_test'/'Posts-test.tsv'
dvc_stage_name = __file__.strip('.py')
STAGE_OUTPUT_PATH = conf.data_dir/dvc_stage_name
conf.remote_mkdir(STAGE_OUTPUT_PATH).compute()
OUTPUT_TRAIN_MATRIX_PATH = STAGE_OUTPUT_PATH/'matrix-train.p'
OUTPUT_TEST_MATRIX_PATH = STAGE_OUTPUT_PATH/'matrix-test.p'
config = get_params()
MAX_FEATUERS = config['max_features']
featurize(
INPUT_TRAIN_TSV_PATH, INPUT_TEST_TSV_PATH,
OUTPUT_TRAIN_MATRIX_PATH, OUTPUT_TEST_MATRIX_PATH,
MAX_FEATUERS).compute()
def client(self):
from dask.distributed import Client, default_client
try:
return default_client()
except ValueError:
if self._cluster:
return Client(self._cluster)
return Client()
def init_client_ovr(tpw=2 ,nw=10 ,mem='6gb' ,p=True):
client = Client(processes=p, threads_per_worker = tpw, n_workers = nw, memory_limit = mem)
client.restart()
print(client)
return client
if num_workers <= 0:
num_workers = max(1, psutil.cpu_count(logical=False) + num_workers)
num_threads_per_worker = (num_threads_per_worker if num_threads_per_worker >= 1 else None)
print('Creating dask LocalCluster with %d worker(s), %r thread(s) per '
'worker' % (num_workers, num_threads_per_worker))
scheduler = dask.distributed.LocalCluster(
ip='0.0.0.0', # Allow reaching the diagnostics port externally
scheduler_port=0, # Don't expose the scheduler port
n_workers=num_workers,
memory_limit=0,
threads_per_worker=num_threads_per_worker,
silence_logs=False
)
return dask.distributed.Client(scheduler)
bst = output['booster']
history = output['history']
# you can pass output directly into `predict` too.
prediction = xgb.dask.predict(client, bst, dtrain)
prediction = prediction.compute()
print('Evaluation history:', history)
return prediction
if __name__ == '__main__':
# `LocalCUDACluster` is used for assigning GPU to XGBoost processes. Here
# `n_workers` represents the number of GPUs since we use one GPU per worker
# process.
with LocalCUDACluster(n_workers=2, threads_per_worker=1) as cluster:
with Client(cluster) as client:
main(client)
def init_client(pp_cfg):
client = Client(**pp_cfg)
Client()
print(client)
return client