Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _parallel_query(scheduler, # scheduler for load balancing
# data needed to reconstruct the kd-tree
data, ndata, ndim, leafsize,
x, nx, d, i, # query data and results
k, eps, p, dub, # auxillary query parameters
ierr, warn_msg): # return values (0 on success)
try:
# View shared memory as ndarrays.
_data = shmem_as_ndarray(data).reshape((ndata, ndim))
_x = shmem_as_ndarray(x).reshape((nx, ndim))
if k == 1:
_d = shmem_as_ndarray(d)
_i = shmem_as_ndarray(i)
else:
_d = shmem_as_ndarray(d).reshape((nx, k))
_i = shmem_as_ndarray(i).reshape((nx, k))
# Reconstruct the kd-tree from the data.
import scipy.spatial as sp
kdtree = sp.cKDTree(_data, leafsize=leafsize)
# Query for nearest neighbours, using slice ranges,
# from the load balancer.
for s in scheduler:
if k == 1:
def _parallel_proj(scheduler, data1, data2, res1, res2, proj_args, proj_kwargs,
inverse, radians, errcheck, ierr, warn_msg):
try:
# View shared memory as ndarrays.
_data1 = shmem_as_ndarray(data1)
_data2 = shmem_as_ndarray(data2)
_res1 = shmem_as_ndarray(res1)
_res2 = shmem_as_ndarray(res2)
# Initialise pyproj
proj = pyproj.Proj(*proj_args, **proj_kwargs)
# Reproject data segment
for s in scheduler:
_res1[s], _res2[s] = proj(_data1[s], _data2[s], inverse=inverse,
radians=radians, errcheck=errcheck)
# An error occured, increment the return value ierr.
# Access to ierr is serialized by multiprocessing.
except Exception as e:
ierr.value += 1
# data needed to reconstruct the kd-tree
data, ndata, ndim, leafsize,
x, nx, d, i, # query data and results
k, eps, p, dub, # auxillary query parameters
ierr, warn_msg): # return values (0 on success)
try:
# View shared memory as ndarrays.
_data = shmem_as_ndarray(data).reshape((ndata, ndim))
_x = shmem_as_ndarray(x).reshape((nx, ndim))
if k == 1:
_d = shmem_as_ndarray(d)
_i = shmem_as_ndarray(i)
else:
_d = shmem_as_ndarray(d).reshape((nx, k))
_i = shmem_as_ndarray(i).reshape((nx, k))
# Reconstruct the kd-tree from the data.
import scipy.spatial as sp
kdtree = sp.cKDTree(_data, leafsize=leafsize)
# Query for nearest neighbours, using slice ranges,
# from the load balancer.
for s in scheduler:
if k == 1:
_d[s], _i[s] = kdtree.query(_x[s,:], k=1, eps=eps, p=p,\
distance_upper_bound=dub)
else:
_d[s,:], _i[s,:] = kdtree.query(_x[s,:], k=k, eps=eps, p=p,\
distance_upper_bound=dub)
# An error occured, increment the return value ierr.
# Access to ierr is serialized by multiprocessing.
if self.is_latlong():
return data1, data2
grid_shape = data1.shape
n = data1.size
# Create shared memory
shmem_data1 = mp.RawArray(ctypes.c_double, n)
shmem_data2 = mp.RawArray(ctypes.c_double, n)
shmem_res1 = mp.RawArray(ctypes.c_double, n)
shmem_res2 = mp.RawArray(ctypes.c_double, n)
# view shared memory as ndarrays
_data1 = shmem_as_ndarray(shmem_data1)
_data2 = shmem_as_ndarray(shmem_data2)
_res1 = shmem_as_ndarray(shmem_res1)
_res2 = shmem_as_ndarray(shmem_res2)
# copy input data to shared memory
_data1[:] = data1.ravel()
_data2[:] = data2.ravel()
# set up a scheduler to load balance the query
scheduler = Scheduler(n, nprocs, chunk=chunk, schedule=schedule)
# Projection with multiple processes
proj_call_args = [scheduler, shmem_data1, shmem_data2, shmem_res1,
shmem_res2, self._args, self._kwargs, inverse,
radians, errcheck]
_run_jobs(_parallel_proj, proj_call_args, nprocs)
return _res1.copy().reshape(grid_shape), _res2.copy().reshape(grid_shape)
def _parallel_proj(scheduler, data1, data2, res1, res2, proj_args, proj_kwargs,
inverse, radians, errcheck, ierr, warn_msg):
try:
# View shared memory as ndarrays.
_data1 = shmem_as_ndarray(data1)
_data2 = shmem_as_ndarray(data2)
_res1 = shmem_as_ndarray(res1)
_res2 = shmem_as_ndarray(res2)
# Initialise pyproj
proj = pyproj.Proj(*proj_args, **proj_kwargs)
# Reproject data segment
for s in scheduler:
_res1[s], _res2[s] = proj(_data1[s], _data2[s], inverse=inverse,
radians=radians, errcheck=errcheck)
# An error occured, increment the return value ierr.
# Access to ierr is serialized by multiprocessing.
except Exception as e:
ierr.value += 1
warn_msg.value = str(e).encode()
def _parallel_transform(scheduler, lons, lats, n, coords, ierr, warn_msg):
try:
# View shared memory as ndarrays.
_lons = shmem_as_ndarray(lons)
_lats = shmem_as_ndarray(lats)
_coords = shmem_as_ndarray(coords).reshape((n, 3))
# Transform to cartesian coordinates
for s in scheduler:
_coords[s, 0] = R * \
np.cos(np.radians(_lats[s])) * np.cos(np.radians(_lons[s]))
_coords[s, 1] = R * \
np.cos(np.radians(_lats[s])) * np.sin(np.radians(_lons[s]))
_coords[s, 2] = R * np.sin(np.radians(_lats[s]))
# An error occured, increment the return value ierr.
# Access to ierr is serialized by multiprocessing.
except Exception as e:
ierr.value += 1
warn_msg.value = str(e).encode()
'''
Same as cKDTree.query except parallelized with multiple
processes and shared memory.
'''
# allocate shared memory for x and result
nx = x.shape[0]
shmem_x = mp.RawArray(ctypes.c_double, nx * self.m)
shmem_d = mp.RawArray(ctypes.c_double, nx * k)
shmem_i = mp.RawArray(ctypes.c_int, nx * k)
# view shared memory as ndarrays
_x = shmem_as_ndarray(shmem_x).reshape((nx, self.m))
if k == 1:
_d = shmem_as_ndarray(shmem_d)
_i = shmem_as_ndarray(shmem_i)
else:
_d = shmem_as_ndarray(shmem_d).reshape((nx, k))
_i = shmem_as_ndarray(shmem_i).reshape((nx, k))
# copy x to shared memory
_x[:] = x
# set up a scheduler to load balance the query
scheduler = Scheduler(nx, self._nprocs, chunk=self._chunk,
schedule=self._schedule)
# query with multiple processes
query_args = [scheduler, self.shmem_data, self.n, self.m,
self.leafsize, shmem_x, nx, shmem_d, shmem_i,
k, eps, p, distance_upper_bound]
def _parallel_proj(scheduler, data1, data2, res1, res2, proj_args, proj_kwargs,
inverse, radians, errcheck, ierr, warn_msg):
try:
# View shared memory as ndarrays.
_data1 = shmem_as_ndarray(data1)
_data2 = shmem_as_ndarray(data2)
_res1 = shmem_as_ndarray(res1)
_res2 = shmem_as_ndarray(res2)
# Initialise pyproj
proj = pyproj.Proj(*proj_args, **proj_kwargs)
# Reproject data segment
for s in scheduler:
_res1[s], _res2[s] = proj(_data1[s], _data2[s], inverse=inverse,
radians=radians, errcheck=errcheck)
# An error occured, increment the return value ierr.
# Access to ierr is serialized by multiprocessing.
except Exception as e:
ierr.value += 1
warn_msg.value = str(e).encode()
def _parallel_transform(scheduler, lons, lats, n, coords, ierr, warn_msg):
try:
# View shared memory as ndarrays.
_lons = shmem_as_ndarray(lons)
_lats = shmem_as_ndarray(lats)
_coords = shmem_as_ndarray(coords).reshape((n, 3))
# Transform to cartesian coordinates
for s in scheduler:
_coords[s, 0] = R * \
np.cos(np.radians(_lats[s])) * np.cos(np.radians(_lons[s]))
_coords[s, 1] = R * \
np.cos(np.radians(_lats[s])) * np.sin(np.radians(_lons[s]))
_coords[s, 2] = R * np.sin(np.radians(_lats[s]))
# An error occured, increment the return value ierr.
# Access to ierr is serialized by multiprocessing.
except Exception as e:
ierr.value += 1
warn_msg.value = str(e).encode()
def _parallel_query(scheduler, # scheduler for load balancing
# data needed to reconstruct the kd-tree
data, ndata, ndim, leafsize,
x, nx, d, i, # query data and results
k, eps, p, dub, # auxillary query parameters
ierr, warn_msg): # return values (0 on success)
try:
# View shared memory as ndarrays.
_data = shmem_as_ndarray(data).reshape((ndata, ndim))
_x = shmem_as_ndarray(x).reshape((nx, ndim))
if k == 1:
_d = shmem_as_ndarray(d)
_i = shmem_as_ndarray(i)
else:
_d = shmem_as_ndarray(d).reshape((nx, k))
_i = shmem_as_ndarray(i).reshape((nx, k))
# Reconstruct the kd-tree from the data.
import scipy.spatial as sp
kdtree = sp.cKDTree(_data, leafsize=leafsize)
# Query for nearest neighbours, using slice ranges,
# from the load balancer.
for s in scheduler:
if k == 1:
_d[s], _i[s] = kdtree.query(_x[s,:], k=1, eps=eps, p=p,\