Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def split(cls, dataset, start, end, datatype, **kwargs):
objs = []
xdim, ydim = dataset.kdims[:2]
if not len(dataset.data):
return []
row = dataset.data.iloc[0]
arr = geom_to_array(row['geometry'])
d = {(xdim.name, ydim.name): arr}
d.update({vd.name: row[vd.name] for vd in dataset.vdims})
ds = dataset.clone(d, datatype=['dictionary'])
for i, row in dataset.data.iterrows():
if datatype == 'geom':
objs.append(row['geometry'])
continue
arr = geom_to_array(row['geometry'])
d = {xdim.name: arr[:, 0], ydim.name: arr[:, 1]}
d.update({vd.name: row[vd.name] for vd in dataset.vdims})
ds.data = d
if datatype == 'array':
obj = ds.array(**kwargs)
elif datatype == 'dataframe':
obj = ds.dframe(**kwargs)
elif datatype == 'columns':
obj = ds.columns(**kwargs)
elif datatype is None:
obj = ds.clone()
else:
raise ValueError("%s datatype not support" % datatype)
objs.append(obj)
return objs
def __len__(self):
return len(geom_to_array(self.data))
def holes(cls, dataset):
from shapely.geometry import Polygon, MultiPolygon
geom = dataset.data['geometry']
if isinstance(geom, Polygon):
return [[[geom_to_array(h) for h in geom.interiors]]]
elif isinstance(geom, MultiPolygon):
return [[[geom_to_array(h) for h in g.interiors] for g in geom]]
return []
def values(cls, dataset, dimension, expanded, flat):
dimension = dataset.get_dimension(dimension)
idx = dataset.get_dimension_index(dimension)
data = dataset.data
if idx not in [0, 1] and not expanded:
return data[dimension.name].values
elif not len(data):
return np.array([])
geom_type = dataset.data.geom_type.iloc[0]
values = []
columns = list(data.columns)
arr = geom_to_array(data.geometry.iloc[0])
ds = dataset.clone(arr, datatype=cls.subtypes, vdims=[])
for i, d in enumerate(data.geometry):
arr = geom_to_array(d)
if idx in [0, 1]:
ds.data = arr
values.append(ds.interface.values(ds, dimension))
else:
arr = np.full(len(arr), data.iloc[i, columns.index(dimension.name)])
values.append(arr)
if geom_type != 'Point':
values.append([np.NaN])
values = values if geom_type == 'Point' else values[:-1]
return np.concatenate(values) if values else np.array([])
def _process_element(self, element):
if not len(element):
return element.clone(crs=self.p.projection)
geom = element.geom()
vertices = geom_to_array(geom)
if isinstance(geom, (MultiPolygon, Polygon)):
obj = Polygons([vertices])
else:
obj = Path([vertices])
geom = project_path(obj, projection=self.p.projection).geom()
return element.clone(geom, crs=self.p.projection)
def values(cls, dataset, dimension, expanded, flat):
dimension = dataset.get_dimension(dimension)
idx = dataset.get_dimension_index(dimension)
data = dataset.data
if idx not in [0, 1] and not expanded:
return data[dimension.name].values
elif not len(data):
return np.array([])
geom_type = dataset.data.geom_type.iloc[0]
values = []
columns = list(data.columns)
arr = geom_to_array(data.geometry.iloc[0])
ds = dataset.clone(arr, datatype=cls.subtypes, vdims=[])
for i, d in enumerate(data.geometry):
arr = geom_to_array(d)
if idx in [0, 1]:
ds.data = arr
values.append(ds.interface.values(ds, dimension))
else:
arr = np.full(len(arr), data.iloc[i, columns.index(dimension.name)])
values.append(arr)
if geom_type != 'Point':
values.append([np.NaN])
values = values if geom_type == 'Point' else values[:-1]
return np.concatenate(values) if values else np.array([])
def shape(cls, dataset):
rows, cols = 0, len(dataset.dimensions())
if len(dataset.data) == 0: return rows, cols
arr = geom_to_array(dataset.data.geometry.iloc[0])
ds = dataset.clone(arr, datatype=cls.subtypes, vdims=[])
for d in dataset.data.geometry:
ds.data = geom_to_array(d)
r, cols = ds.interface.shape(ds)
rows += r
geom_type = dataset.data.geom_type.iloc[0]
offset = 0 if geom_type == 'Point' else len(dataset.data)-1
return rows+offset, cols
def shape(cls, dataset):
rows, cols = 0, len(dataset.dimensions())
if len(dataset.data) == 0: return rows, cols
arr = geom_to_array(dataset.data.geometry.iloc[0])
ds = dataset.clone(arr, datatype=cls.subtypes, vdims=[])
for d in dataset.data.geometry:
ds.data = geom_to_array(d)
r, cols = ds.interface.shape(ds)
rows += r
geom_type = dataset.data.geom_type.iloc[0]
offset = 0 if geom_type == 'Point' else len(dataset.data)-1
return rows+offset, cols
def range(cls, dataset, dim):
dim = dataset.get_dimension_index(dim)
if dim in [0, 1]:
ranges = []
arr = geom_to_array(dataset.data.geometry.iloc[0])
ds = dataset.clone(arr, datatype=cls.subtypes, vdims=[])
for d in dataset.data.geometry:
ds.data = geom_to_array(d)
ranges.append(ds.interface.range(ds, dim))
return max_range(ranges)
else:
dim = dataset.get_dimension(dim)
vals = dataset.data[dim.name]
return vals.min(), vals.max()
def _process_element(self, element):
if not len(element):
return element.clone(crs=self.p.projection)
geom = element.geom()
vertices = geom_to_array(geom)
if isinstance(geom, (MultiPolygon, Polygon)):
obj = Polygons([vertices])
else:
obj = Path([vertices])
geom = project_path(obj, projection=self.p.projection).geom()
return element.clone(geom, crs=self.p.projection)