Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_aggregate_points_categorical(self):
points = Points([(0.2, 0.3, 'A'), (0.4, 0.7, 'B'), (0, 0.99, 'C')], vdims='z')
img = aggregate(points, dynamic=False, x_range=(0, 1), y_range=(0, 1),
width=2, height=2, aggregator=ds.count_cat('z'))
xs, ys = [0.25, 0.75], [0.25, 0.75]
expected = NdOverlay({'A': Image((xs, ys, [[1, 0], [0, 0]]), vdims='z Count'),
'B': Image((xs, ys, [[0, 0], [1, 0]]), vdims='z Count'),
'C': Image((xs, ys, [[0, 0], [1, 0]]), vdims='z Count')},
kdims=['z'])
self.assertEqual(img, expected)
def test_aggregate_ndoverlay_count_cat_datetimes_microsecond_timebase(self):
dates = pd.date_range(start="2016-01-01", end="2016-01-03", freq='1D')
xstart = np.datetime64('2015-12-31T23:59:59.723518000', 'us')
xend = np.datetime64('2016-01-03T00:00:00.276482000', 'us')
curve = Curve((dates, [1, 2, 3]))
curve2 = Curve((dates, [3, 2, 1]))
ndoverlay = NdOverlay({0: curve, 1: curve2}, 'Cat')
imgs = aggregate(ndoverlay, aggregator=ds.count_cat('Cat'), width=2, height=2,
x_range=(xstart, xend), dynamic=False)
bounds = (np.datetime64('2015-12-31T23:59:59.723518'), 1.0,
np.datetime64('2016-01-03T00:00:00.276482'), 3.0)
dates = [np.datetime64('2016-01-01T11:59:59.861759000',),
np.datetime64('2016-01-02T12:00:00.138241000')]
expected = Image((dates, [1.5, 2.5], [[1, 0], [0, 2]]),
datatype=['xarray'], bounds=bounds, vdims='Count')
expected2 = Image((dates, [1.5, 2.5], [[0, 1], [1, 1]]),
datatype=['xarray'], bounds=bounds, vdims='Count')
self.assertEqual(imgs[0], expected)
self.assertEqual(imgs[1], expected2)
def test_aggregate_points_categorical_zero_range(self):
points = Points([(0.2, 0.3, 'A'), (0.4, 0.7, 'B'), (0, 0.99, 'C')], vdims='z')
img = aggregate(points, dynamic=False, x_range=(0, 0), y_range=(0, 1),
aggregator=ds.count_cat('z'))
xs, ys = [], [0.25, 0.75]
params = dict(bounds=(0, 0, 0, 1), xdensity=1)
expected = NdOverlay({'A': Image((xs, ys, np.zeros((2, 0))), vdims='z Count', **params),
'B': Image((xs, ys, np.zeros((2, 0))), vdims='z Count', **params),
'C': Image((xs, ys, np.zeros((2, 0))), vdims='z Count', **params)},
kdims=['z'])
self.assertEqual(img, expected)
def create_aggregate(self, plot_width, plot_height, x_range, y_range,
agg_field, x_field, y_field, agg_func, glyph):
canvas = ds.Canvas(plot_width=plot_width,
plot_height=plot_height,
x_range=x_range,
y_range=y_range)
method = getattr(canvas, glyph)
# handle categorical field
if agg_field in self.categorical_fields:
agg = method(self.df, x_field, y_field, ds.count_cat(agg_field))
# handle ordinal field
elif agg_field in self.ordinal_fields:
func = self.aggregate_functions[agg_func]
agg = method(self.df, x_field, y_field, func(agg_field))
else:
agg = method(self.df, x_field, y_field)
return agg
(x_range, y_range), (xs, ys), (width, height), (xtype, ytype) = info
x0, x1 = x_range
y0, y1 = y_range
params = self._get_agg_params(element, xdim, ydim, agg_fn, (x0, y0, x1, y1))
if width == 0 or height == 0:
return self._empty_agg(element, xdim, ydim, width, height, xs, ys, agg_fn, **params)
cvs = ds.Canvas(plot_width=width, plot_height=height,
x_range=x_range, y_range=y_range)
if element.interface.datatype != 'spatialpandas':
element = element.clone(datatype=['spatialpandas'])
data = element.data
if isinstance(agg_fn, ds.count_cat):
data[agg_fn.column] = data[agg_fn.column].astype('category')
col = element.interface.geo_column(element.data)
if isinstance(element, Polygons):
agg = cvs.polygons(data, geometry=col, agg=agg_fn)
elif isinstance(element, Path):
agg = cvs.line(data, geometry=col, agg=agg_fn)
elif isinstance(element, Points):
agg = cvs.points(data, geometry=col, agg=agg_fn)
if agg.ndim == 2:
return self.p.element_type(agg, **params)
else:
layers = {}
for c in agg.coords[agg_fn.column].data:
cagg = agg.sel(**{agg_fn.column: c})
extent = _get_extent(points)
canvas = ds.Canvas(plot_width=width,
plot_height=height,
x_range=(extent[0], extent[1]),
y_range=(extent[2], extent[3]))
data = pd.DataFrame(points, columns=('x', 'y'))
# Color by labels
if labels is not None:
if labels.shape[0] != points.shape[0]:
raise ValueError('Labels must have a label for '
'each sample (size mismatch: {} {})'.format(labels.shape[0],
points.shape[0]))
data['label'] = pd.Categorical(labels)
aggregation = canvas.points(data, 'x', 'y', agg=ds.count_cat('label'))
if color_key is None and color_key_cmap is None:
result = tf.shade(aggregation, how='eq_hist')
elif color_key is None:
unique_labels = np.unique(labels)
num_labels = unique_labels.shape[0]
color_key = _to_hex(plt.get_cmap(color_key_cmap)(np.linspace(0, 1, num_labels)))
result = tf.shade(aggregation, color_key=color_key, how='eq_hist')
else:
result = tf.shade(aggregation, color_key=color_key, how='eq_hist')
# Color by values
elif values is not None:
if values.shape[0] != points.shape[0]:
raise ValueError('Values must have a value for '
'each sample (size mismatch: {} {})'.format(values.shape[0],
points.shape[0]))
def applies(cls, element, agg_fn):
return (isinstance(element, NdOverlay) and
((isinstance(agg_fn, (ds.count, ds.sum, ds.mean)) and
(agg_fn.column is None or agg_fn.column not in element.kdims)) or
(isinstance(agg_fn, ds.count_cat) and agg_fn.column in element.kdims)))
(isinstance(agg_fn, ds.count_cat) and agg_fn.column in element.kdims))):
return self._aggregate_ndoverlay(element, agg_fn)
x, y, data, glyph = self.get_agg_data(element, category)
(x_range, y_range), (xs, ys), (width, height), (xtype, ytype) = self._get_sampling(element, x, y)
if x is None or y is None:
xarray = xr.DataArray(np.full((height, width), np.NaN, dtype=np.float32),
dims=['y', 'x'], coords={'x': xs, 'y': ys})
return self.p.element_type(xarray)
cvs = ds.Canvas(plot_width=width, plot_height=height,
x_range=x_range, y_range=y_range)
column = agg_fn.column
if column and isinstance(agg_fn, ds.count_cat):
name = '%s Count' % agg_fn.column
else:
name = column
vdims = [element.get_dimension(column)(name) if column
else Dimension('Count')]
params = dict(get_param_values(element), kdims=[x, y],
datatype=['xarray'], vdims=vdims)
dfdata = PandasInterface.as_dframe(data)
agg = getattr(cvs, glyph)(dfdata, x.name, y.name, self.p.aggregator)
if 'x_axis' in agg and 'y_axis' in agg:
agg = agg.rename({'x_axis': x, 'y_axis': y})
if xtype == 'datetime':
agg[x.name] = agg[x.name].astype('datetime64[us]')
if ytype == 'datetime':
agg[y.name] = agg[y.name].astype('datetime64[us]')
elif values is not None:
if values.shape[0] != points.shape[0]:
raise ValueError('Values must have a value for '
'each sample (size mismatch: {} {})'.format(values.shape[0],
points.shape[0]))
unique_values = np.unique(values)
if unique_values.shape[0] >= 256:
min_val, max_val = np.min(values), np.max(values)
bin_size = (max_val - min_val) / 256.0
data['val_cat'] = pd.Categorical(np.round((values - min_val) / bin_size).astype(np.int16))
aggregation = canvas.points(data, 'x', 'y', agg=ds.count_cat('val_cat'))
color_key = _to_hex(plt.get_cmap(cmap)(np.linspace(0, 1, 256)))
result = tf.shade(aggregation, color_key=color_key, how='eq_hist')
else:
data['val_cat'] = pd.Categorical(values)
aggregation = canvas.points(data, 'x', 'y', agg=ds.count_cat('val_cat'))
color_key_cols = _to_hex(plt.get_cmap(cmap)(np.linspace(0, 1, unique_values.shape[0])))
color_key = dict(zip(unique_values, color_key_cols))
result = tf.shade(aggregation, color_key=color_key, how='eq_hist')
# Color by density (default datashader option)
else:
aggregation = canvas.points(data, 'x', 'y', agg=ds.count())
result = tf.shade(aggregation, cmap=plt.get_cmap(cmap))
if background is not None:
result = tf.set_background(result, background)
if ax is not None:
_embed_datashader_in_an_axis(result, ax)
return ax
else:
def datashade_ndcurve(ovly, kdim=None, spread=False):
if not kdim:
kdim = ovly.kdims[0].name
var = np.unique(ovly.dimension_values(kdim)).tolist()
color_key = [(v, Category10_10[iv]) for iv, v in enumerate(var)]
color_pts = hv.NdOverlay(
{k: hv.Points([0, 0], label=str(k)).opts(style=dict(color=v)) for k, v in color_key})
ds_ovly = datashade(
ovly,
aggregator=count_cat(kdim),
color_key=dict(color_key),
min_alpha=200,
normalization='linear')
if spread:
ds_ovly = dynspread(ds_ovly)
return ds_ovly * color_pts