Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_rasterize_quadmesh(self):
qmesh = QuadMesh(([0, 1], [0, 1], np.array([[0, 1], [2, 3]])))
img = rasterize(qmesh, width=3, height=3, dynamic=False, aggregator=ds.mean('z'))
image = Image(np.array([[2., 3., np.NaN], [0, 1, np.NaN], [np.NaN, np.NaN, np.NaN]]),
bounds=(-.5, -.5, 1.5, 1.5))
self.assertEqual(img, image)
def test_rasterize_trimesh_ds_aggregator(self):
simplices = [(0, 1, 2, 0.5), (3, 2, 1, 1.5)]
vertices = [(0., 0.), (0., 1.), (1., 0), (1, 1)]
trimesh = TriMesh((simplices, vertices), vdims=['z'])
img = rasterize(trimesh, width=3, height=3, dynamic=False, aggregator=ds.mean('z'))
image = Image(np.array([[1.5, 1.5, np.NaN], [0.5, 1.5, np.NaN], [np.NaN, np.NaN, np.NaN]]),
bounds=(0, 0, 1, 1))
self.assertEqual(img, image)
from geoviews import Points, WMTS, Path
from holoviews.streams import PolyDraw, PolyEdit, PointerX
from holoviews import Curve, NdOverlay, DynamicMap, VLine, Image, TriMesh
from holoviews.operation.datashader import datashade, rasterize
from holoviews.util import Dynamic
class LineCrossSection(param.Parameterized):
"""
LineCrossSection rasterizes any HoloViews element and takes
cross-sections of the resulting Image along poly-lines drawn
using the PolyDraw tool.
"""
aggregator = param.ClassSelector(class_=ds.reductions.Reduction,
default=ds.mean())
tile_url = param.String(default='http://c.tile.openstreetmap.org/{Z}/{X}/{Y}.png',
doc="URL for the tile source", precedence=-1)
resolution = param.Number(default=1000, doc="""
Distance between samples in meters. Used for interpolation
of the cross-section paths.""")
_num_objects = None
def __init__(self, obj, paths=None, **params):
super(LineCrossSection, self).__init__(**params)
self.obj = obj
paths = [] if paths is None else paths
self.path = Path(paths, crs=ccrs.GOOGLE_MERCATOR)
self.path_stream = PolyDraw(source=self.path,
# Optimize categorical counts by aggregating them individually
if isinstance(agg_fn, ds.count_cat):
agg_params.update(dict(dynamic=False, aggregator=ds.count()))
agg_fn1 = aggregate.instance(**agg_params)
if element.ndims == 1:
grouped = element
else:
grouped = element.groupby([agg_fn.column], container_type=NdOverlay,
group_type=NdOverlay)
return grouped.clone({k: agg_fn1(v) for k, v in grouped.items()})
# Create aggregate instance for sum, count operations, breaking mean
# into two aggregates
column = agg_fn.column or 'Count'
if isinstance(agg_fn, ds.mean):
agg_fn1 = aggregate.instance(**dict(agg_params, aggregator=ds.sum(column)))
agg_fn2 = aggregate.instance(**dict(agg_params, aggregator=ds.count()))
else:
agg_fn1 = aggregate.instance(**agg_params)
agg_fn2 = None
is_sum = isinstance(agg_fn1.aggregator, ds.sum)
# Accumulate into two aggregates and mask
agg, agg2, mask = None, None, None
mask = None
for v in element:
# Compute aggregates and mask
new_agg = agg_fn1.process_element(v, None)
if is_sum:
new_mask = np.isnan(new_agg.data[column].values)
new_agg.data = new_agg.data.fillna(0)
def __init__(self, config_file, outofcore, app_port):
self.load_config_file(config_file)
self.plot_height = 600
self.plot_width = 990
self.aggregate_functions = OrderedDict()
self.aggregate_functions['Count'] = ds.count
self.aggregate_functions['Mean'] = ds.mean
self.aggregate_functions['Sum'] = ds.sum
self.aggregate_functions['Min'] = ds.min
self.aggregate_functions['Max'] = ds.max
self.agg_function_name = list(self.aggregate_functions.keys())[0]
# transfer function configuration
self.transfer_functions = OrderedDict()
self.transfer_functions['Histogram Equalization'] = 'eq_hist'
self.transfer_functions['Linear'] = 'linear'
self.transfer_functions['Log'] = 'log'
self.transfer_functions[u"\u221B - Cube Root"] = 'cbrt'
self.transfer_function = list(self.transfer_functions.values())[0]
self.basemaps = OrderedDict()
self.basemaps['Imagery'] = ('http://server.arcgisonline.com/arcgis'
'/rest/services/World_Imagery/MapServer'