Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
elif result['type'] == 'way':
# Parse POI area Polygon
poi_area = parse_polygonal_poi(coords=coords, response=result)
if poi_area:
# Add element_type
poi_area['element_type'] = 'way'
# Add to 'poi_ways'
poi_ways[result['id']] = poi_area
elif result['type'] == 'relation':
# Add relation to a relation list (needs to be parsed after all nodes and ways have been parsed)
relations.append(result)
# Create GeoDataFrames
gdf_nodes = gpd.GeoDataFrame(poi_nodes).T
gdf_nodes.crs = settings.default_crs
gdf_ways = gpd.GeoDataFrame(poi_ways).T
gdf_ways.crs = settings.default_crs
# Parse relations (MultiPolygons) from 'ways'
gdf_ways = parse_osm_relations(relations=relations, osm_way_df=gdf_ways)
# Combine GeoDataFrames
gdf = gdf_nodes.append(gdf_ways, sort=False)
return gdf
for relation_key, relation_val in relations.items():
relation_val['geometry'] = create_relation_geometry(relation_key, relation_val, footprints)
# merge relations into the footprints dictionary
footprints.update(relations)
# delete supporting geometry not directly tagged with footprint_type from the footprints dictionary
for untagged_way in untagged_ways:
try:
del footprints[untagged_way]
except KeyError:
log('untagged_way {} not found in footprints dict'.format(untagged_way))
# Convert footprints dictionary to a GeoDataFrame
gdf = gpd.GeoDataFrame.from_dict(footprints, orient='index')
gdf.crs = settings.default_crs
# filter the gdf to only include valid Polygons or MultiPolygons
if not retain_invalid:
filter1 = gdf['geometry'].is_valid
filter2 = (gdf['geometry'].geom_type == 'Polygon') | (gdf['geometry'].geom_type == 'MultiPolygon')
filter_combined = filter1 & filter2
gdf = gdf[filter_combined]
return gdf
features = [{'type': 'Feature',
'geometry': geometry,
'properties': {'place_name': place,
'bbox_north': bbox_north,
'bbox_south': bbox_south,
'bbox_east': bbox_east,
'bbox_west': bbox_west}}]
# if we got an unexpected geometry type (like a point), log a warning
if geometry['type'] not in ['Polygon', 'MultiPolygon']:
log('OSM returned a {} as the geometry.'.format(geometry['type']), level=lg.WARNING)
# create the GeoDataFrame, name it, and set its original CRS to default_crs
gdf = gpd.GeoDataFrame.from_features(features)
gdf.gdf_name = gdf_name
gdf.crs = settings.default_crs
# if buffer_dist was passed in, project the geometry to UTM, buffer it
# in meters, then project it back to lat-long
if buffer_dist is not None:
gdf_utm = project_gdf(gdf)
gdf_utm['geometry'] = gdf_utm['geometry'].buffer(buffer_dist)
gdf = project_gdf(gdf_utm, to_latlong=True)
log('Buffered the GeoDataFrame "{}" to {} meters'.format(gdf.gdf_name, buffer_dist))
# return the gdf
log('Created GeoDataFrame with {} row for query "{}"'.format(len(gdf), query))
return gdf
else:
# if there was no data returned (or fewer results than which_result
# specified)
log('OSM returned no results (or fewer than which_result) for query "{}"'.format(query), level=lg.WARNING)
imgs_folder=settings.imgs_folder,
cache_folder=settings.cache_folder,
use_cache=settings.use_cache,
log_file=settings.log_file,
log_console=settings.log_console,
log_level=settings.log_level,
log_name=settings.log_name,
log_filename=settings.log_filename,
useful_tags_node=settings.useful_tags_node,
useful_tags_path=settings.useful_tags_path,
osm_xml_node_attrs=settings.osm_xml_node_attrs,
osm_xml_node_tags=settings.osm_xml_node_tags,
osm_xml_way_attrs=settings.osm_xml_way_attrs,
osm_xml_way_tags=settings.osm_xml_way_tags,
default_access=settings.default_access,
default_crs=settings.default_crs,
default_user_agent=settings.default_user_agent,
default_referer=settings.default_referer,
default_accept_language=settings.default_accept_language,
nominatim_endpoint=settings.nominatim_endpoint,
nominatim_key=settings.nominatim_key,
overpass_endpoint=settings.overpass_endpoint):
"""
Configure osmnx by setting the default global vars to desired values.
Parameters
---------
data_folder : string
where to save and load data files
logs_folder : string
where to write the log files
imgs_folder : string
-------
networkx multidigraph
"""
log('Creating networkx graph from downloaded OSM data...')
start_time = time.time()
# make sure we got data back from the server requests
elements = []
for response_json in response_jsons:
elements.extend(response_json['elements'])
if len(elements) < 1:
raise EmptyOverpassResponse('There are no data elements in the response JSON objects')
# create the graph as a MultiDiGraph and set the original CRS to default_crs
G = nx.MultiDiGraph(name=name, crs=settings.default_crs)
# extract nodes and paths from the downloaded osm data
nodes = {}
paths = {}
for osm_data in response_jsons:
nodes_temp, paths_temp = parse_osm_nodes_paths(osm_data)
for key, value in nodes_temp.items():
nodes[key] = value
for key, value in paths_temp.items():
paths[key] = value
# add each osm node to the graph
for node, data in nodes.items():
G.add_node(node, **data)
# add each osm way (aka, path) to the graph
settings.logs_folder = logs_folder
settings.log_console = log_console
settings.log_file = log_file
settings.log_level = log_level
settings.log_name = log_name
settings.log_filename = log_filename
settings.useful_tags_node = useful_tags_node
settings.useful_tags_path = useful_tags_path
settings.useful_tags_node = list(set(useful_tags_node + osm_xml_node_attrs + osm_xml_node_tags))
settings.useful_tags_path = list(set(useful_tags_path + osm_xml_way_attrs + osm_xml_way_tags))
settings.osm_xml_node_attrs = osm_xml_node_attrs
settings.osm_xml_node_tags = osm_xml_node_tags
settings.osm_xml_way_attrs = osm_xml_way_attrs
settings.osm_xml_way_tags = osm_xml_way_tags
settings.default_access = default_access
settings.default_crs = default_crs
settings.default_user_agent = default_user_agent
settings.default_referer = default_referer
settings.default_accept_language = default_accept_language
settings.nominatim_endpoint = nominatim_endpoint
settings.nominatim_key = nominatim_key
settings.overpass_endpoint = overpass_endpoint
# if logging is turned on, log that we are configured
if settings.log_file or settings.log_console:
log('Configured osmnx')
default value (None) will set settings.default_crs as the CRS
to_crs : dict
if not None, just project to this CRS instead of to UTM
to_latlong : bool
if True, project from crs to lat-long, if False, project from crs to
local UTM zone
Returns
-------
tuple
(geometry_proj, crs), the projected shapely geometry and the crs of the
projected geometry
"""
if crs is None:
crs = settings.default_crs
gdf = gpd.GeoDataFrame()
gdf.crs = crs
gdf.gdf_name = 'geometry to project'
gdf['geometry'] = None
gdf.loc[0, 'geometry'] = geometry
gdf_proj = project_gdf(gdf, to_crs=to_crs, to_latlong=to_latlong)
geometry_proj = gdf_proj['geometry'].iloc[0]
return geometry_proj, gdf_proj.crs
# the ticks in so there's no space around the plot
if axis_off:
ax.axis('off')
ax.margins(0)
ax.tick_params(which='both', direction='in')
xaxis.set_visible(False)
yaxis.set_visible(False)
fig.canvas.draw()
if equal_aspect:
# make everything square
ax.set_aspect('equal')
fig.canvas.draw()
else:
# if the graph is not projected, conform the aspect ratio to not stretch the plot
if G.graph['crs'] == settings.default_crs:
coslat = np.cos((min(node_Ys) + max(node_Ys)) / 2. / 180. * np.pi)
ax.set_aspect(1. / coslat)
fig.canvas.draw()
# annotate the axis with node IDs if annotate=True
if annotate:
for node, data in G.nodes(data=True):
ax.annotate(node, xy=(data['x'], data['y']))
# save and show the figure as specified
fig, ax = save_and_show(fig, ax, save, show, close, filename, file_format, dpi, axis_off)
return fig, ax
start_time = time.time()
# if gdf has no gdf_name attribute, create one now
if not hasattr(gdf, 'gdf_name'):
gdf.gdf_name = 'unnamed'
# if to_crs was passed-in, use this value to project the gdf
if to_crs is not None:
projected_gdf = gdf.to_crs(to_crs)
# if to_crs was not passed-in, calculate the centroid of the geometry to
# determine UTM zone
else:
if to_latlong:
# if to_latlong is True, project the gdf to latlong
latlong_crs = settings.default_crs
projected_gdf = gdf.to_crs(latlong_crs)
log('Projected the GeoDataFrame "{}" to default_crs in {:,.2f} seconds'.format(gdf.gdf_name, time.time()-start_time))
else:
# else, project the gdf to UTM
# if GeoDataFrame is already in UTM, just return it
if (gdf.crs is not None) and ('+proj=utm ' in gdf.crs):
return gdf
# calculate the centroid of the union of all the geometries in the
# GeoDataFrame
avg_longitude = gdf['geometry'].unary_union.centroid.x
# calculate the UTM zone from this avg longitude and define the UTM
# CRS to project
utm_zone = int(math.floor((avg_longitude + 180) / 6.) + 1)
utm_crs = '+proj=utm +zone={} +ellps=WGS84 +datum=WGS84 +units=m +no_defs'.format(utm_zone)