Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_water_layer(self):
# water layer should be clipped to the tile bounds expanded by 10%.
from ModestMaps.Core import Coordinate
from tilequeue.tile import coord_to_mercator_bounds
from shapely import wkb
tile = Coordinate(zoom=15, column=10, row=10)
bounds = coord_to_mercator_bounds(tile)
read_row = self._test('water', tile, 1.0)
clipped_shape = wkb.loads(read_row['__geometry__'])
# for water layer, the geometry should be 10% larger than the tile
# bounds.
x_factor = ((clipped_shape.bounds[2] - clipped_shape.bounds[0]) /
(bounds[2] - bounds[0]))
y_factor = ((clipped_shape.bounds[2] - clipped_shape.bounds[0]) /
(bounds[2] - bounds[0]))
self.assertAlmostEqual(1.1, x_factor)
self.assertAlmostEqual(1.1, y_factor)
# shapely.union and shapely.intersection to test expand/contract
prevent_shape_removal=True,
repair_intersections=True,
# to speedup repair intersections
use_topology=True, # TODO: Fails with TRUE because multiple chains in linearring
simplify_shared_edges=True,
simplify_non_shared_edges=True,
)
simp_geometries = self._test_geometry_simplification(geometries, simplifier, simplifier_params, constraints,
check_valid=False, check_simple=False)
# self.save_shapefile(data_path('test'), 'simp{}'.format(mode), simp_geometries)
print "Validating geometries..."
for key, simp_wkb in simp_geometries.iteritems():
try:
geom = shapely.wkb.loads(geometries[key])
simp_geom = shapely.wkb.loads(simp_wkb)
# intersection = geom.intersection(simp_geom)
# union = geom.union(simp_geom)
if mode == "Expand":
# Nothing from the original geometry is lost
diff = geom.difference(simp_geom)
if diff.area > 1e-8:
raise self.failureException("Part of original geometry is lost")
if mode == "Contract":
diff = simp_geom.difference(geom)
if diff.area > 1e-8:
raise self.failureException("Part of simplified geometry is not in the original geometry")
except Exception:
print "Failed on geometry id={}".format(key)
raise
'layer': 'boundaries',
}]
fetch = make_rawr_data_fetcher(
top_zoom, max_zoom, storage, layers, index_cfg)
for fetcher, _ in fetch.fetch_tiles(_wrap(top_tile)):
read_rows = fetcher(tile.zoom, coord_to_mercator_bounds(tile))
self.assertEqual(len(read_rows), 1)
props = read_rows[0]['__boundaries_properties__']
self.assertEqual(props.get('kind'), 'maritime')
# check no special boundaries geometry - should just be the polygon
self.assertIsNone(read_rows[0].get('__boundaries_geometry__'))
shape = shapely.wkb.loads(read_rows[0]['__geometry__'])
self.assertEqual(shape.geom_type, 'Polygon')
assert e2 == expected2
s2 = shapely.wkb.loads(bytes(e2.data))
assert isinstance(s2, Point)
assert s2.equals(p)
# Extended case: SRID=2145;POINT(1 2)
expected3 = WKBElement(
str('01010000206a080000000000000000f03f0000000000000040'),
extended=True)
e3 = from_shape(p, srid=2154, extended=True)
assert isinstance(e3, WKBElement)
assert isinstance(e3.data, buffer)
assert e3 == expected3
s3 = shapely.wkb.loads(bytes(e3.data))
assert isinstance(s, Point)
assert s3.equals(p)
if len(w.tags) == 0:
return
if w.is_closed() and closed_way_is_polygon(w.tags): # this will be handled in area()
return
try:
# NOTE: it is possible this is actually a MultiLineString
# in the case where a LineString is clipped by the clipping geom,
# or the way is self-intersecting
# but GDAL and QGIS seem to handle it OK.
linestring = None
for theme in self.mapping.themes:
if theme.matches(GeomType.LINE,w.tags):
if not linestring:
wkb = fab.create_linestring(w)
if self.clipping_geom:
sg = loads(bytes.fromhex(wkb))
if not self.prepared_clipping_geom.intersects(sg):
return
if not self.prepared_clipping_geom.contains_properly(sg):
sg = self.clipping_geom.intersection(sg)
linestring = ogr.CreateGeometryFromWkb(dumps(sg))
else:
linestring = create_geom(wkb)
for output in self.outputs:
output.write(w.id,theme.name,GeomType.LINE,linestring,w.tags)
except RuntimeError:
print("Incomplete way: {0}".format(w.id))
elif layer=='end':
cur.execute("""
select
array_agg(n.id),
coalesce(st_collect(e.geom), 'GEOMETRYCOLLECTION EMPTY'::geometry),
coalesce(st_collect(st_makeline(
st_3dlineinterpolatepoint(n.geom,.5),
st_3dlineinterpolatepoint(e.geom,.5))), 'GEOMETRYCOLLECTION EMPTY'::geometry)
from albion.end_node as e
join albion.node as n on n.id=e.node_id
where e.graph_id='{}'
""".format(self.__param["graph_id"]))
res = cur.fetchone()
lines = wkb.loads(bytes.fromhex(res[1]))
edges = wkb.loads(bytes.fromhex(res[2]))
lines_ids = res[0]
if lines_ids is None:
return
vtx = []
idx = []
colors = []
for line, edge, id_ in zip(lines, edges, lines_ids):
elt = len(idx)
self.idx_to_id_map[layer][elt] = id_
colors += [(elt >> 16 & 0xff, elt >> 8 & 0xff, elt >> 0 & 0xff, 255)]*(len(line.coords)+len(edge.coords))
idx += [(i, i+1) for i in range(len(vtx), len(vtx)+len(line.coords)-1)]
vtx += list(line.coords)
idx += [(i, i+1) for i in range(len(vtx), len(vtx)+len(edge.coords)-1)]
vtx += list(edge.coords)
self.vtx[layer] = numpy.array(vtx, dtype=numpy.float32)
if len(vtx):
for feature in layer:
# Stop reading features
if len(features) == count:
break
properties = dict()
for (index, name) in enumerate(names):
properties[name] = feature.GetField(index)
if not include_geom:
features.append(dict(type='Feature', properties=properties, geometry=None))
continue
geometry = feature.GetGeometryRef()
shape = wkb.loads(geometry.ExportToWkb())
features.append(dict(type='Feature', properties=properties, geometry=shape.__geo_interface__))
return features
from math import hypot
from shapely.wkb import loads
from shapely.geometry import *
point1 = Point(random(), random())
point2 = loads(approximate_wkb(point1.wkb))
assert hypot(point1.x - point2.x, point1.y - point2.y) < 1e-8
point1 = Point(random(), random())
point2 = Point(random(), random())
point3 = point1.union(point2)
point4 = loads(approximate_wkb(point3.wkb))
assert hypot(point3.geoms[0].x - point4.geoms[0].x, point3.geoms[0].y - point4.geoms[0].y) < 1e-8
assert hypot(point3.geoms[1].x - point4.geoms[1].x, point3.geoms[1].y - point4.geoms[1].y) < 1e-8
line1 = Point(random(), random()).buffer(1 + random(), 3).exterior
line2 = loads(approximate_wkb(line1.wkb))
assert abs(1. - line2.length / line1.length) < 1e-8
line1 = Point(random(), random()).buffer(1 + random(), 3).exterior
line2 = Point(random(), random()).buffer(1 + random(), 3).exterior
line3 = MultiLineString([line1, line2])
block_id,
direction_id,
route_id,
vehicle_id,
(ST_DumpPoints(ST_Transform(orig_geom,4326))).geom,
unnest(times)
FROM {trips}
WHERE trip_id = %(trip_id)s
""".format(**conf['db']['tables']),
{ 'trip_id':trip_id }
)
vehicle_records = []
for (bid, did, rid, vid, WGS84geom, epoch_time ) in c:
# only consider the last three variables, as the rest are
# the same for every record
WGS84geom = loadWKB(WGS84geom,hex=True)
# Vehicle( epoch_time, longitude, latitude)
vehicle_records.append( Vehicle( epoch_time, WGS84geom.x, WGS84geom.y ) )
result = {
'block_id': bid,
'direction_id': did,
'route_id': rid,
'vehicle_id': vid,
'points': vehicle_records
}
return result
code = feature.GetFieldAsString(sourceConfig.get('country_code_index'))
if code == '-99':
code = '_'+str(nextCode)
nextCode += 1
name = feature.GetFieldAsString(sourceConfig.get('country_name_index')).decode(sourceConfig.get('input_file_encoding'))
codes[name] = code
layer.ResetReading()
# load features
for feature in layer:
geometry = feature.GetGeometryRef()
geometryType = geometry.GetGeometryType()
if geometryType == ogr.wkbPolygon or geometryType == ogr.wkbMultiPolygon:
geometry.TransformTo( self.spatialRef )
shapelyGeometry = shapely.wkb.loads( geometry.ExportToWkb() )
if not shapelyGeometry.is_valid:
#buffer to fix selfcrosses
shapelyGeometry = shapelyGeometry.buffer(0, 1)
shapelyGeometry = self.applyFilters(shapelyGeometry)
if shapelyGeometry:
name = feature.GetFieldAsString(sourceConfig.get('country_name_index')).decode(sourceConfig.get('input_file_encoding'))
code = codes[name]
self.features[code] = {"geometry": shapelyGeometry, "name": name, "code": code}
else:
raise Exception, "Wrong geometry type: "+geometryType