Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if memory is None:
maxsize = ''
else:
maxsize = '[maxsize:{}]'.format(memory)
# define the query to send the API
if by_bbox:
# turn bbox into a polygon and project to local UTM
polygon = Polygon([(west, south), (east, south), (east, north), (west, north)])
geometry_proj, crs_proj = project_geometry(polygon)
# subdivide it if it exceeds the max area size (in meters), then project
# back to lat-long
geometry_proj_consolidated_subdivided = consolidate_subdivide_geometry(geometry_proj, max_query_area_size=max_query_area_size)
geometry, _ = project_geometry(geometry_proj_consolidated_subdivided, crs=crs_proj, to_latlong=True)
log('Requesting building footprints data within bounding box from API in {:,} request(s)'.format(len(geometry)))
start_time = time.time()
# loop through each polygon rectangle in the geometry (there will only
# be one if original bbox didn't exceed max area size)
for poly in geometry:
# represent bbox as south,west,north,east and round lat-longs to 8
# decimal places (ie, within 1 mm) so URL strings aren't different
# due to float rounding issues (for consistent caching)
west, south, east, north = poly.bounds
query_template = ('[out:json][timeout:{timeout}]{maxsize};((way["building"]({south:.8f},'
'{west:.8f},{north:.8f},{east:.8f});(._;>;););(relation["building"]'
'({south:.8f},{west:.8f},{north:.8f},{east:.8f});(._;>;);););out;')
query_str = query_template.format(north=north, south=south, east=east, west=west, timeout=timeout, maxsize=maxsize)
response_json = overpass_request(data={'data':query_str}, timeout=timeout)
response_jsons.append(response_json)
msg = ('Got all building footprints data within bounding box from '
log('Identified {:,} edge endpoints in {:,.2f} seconds'.format(len(endpoints), time.time()-start_time))
start_time = time.time()
paths_to_simplify = []
# for each endpoint node, look at each of its successor nodes
for node in endpoints:
for successor in G.successors(node):
if successor not in endpoints:
# if the successor is not an endpoint, build a path from the
# endpoint node to the next endpoint node
try:
path = build_path(G, successor, endpoints, path=[node, successor])
paths_to_simplify.append(path)
except RuntimeError:
log('Recursion error: exceeded max depth, moving on to next endpoint successor', level=lg.WARNING)
# recursion errors occur if some connected component is a
# self-contained ring in which all nodes are not end points.
# could also occur in extremely long street segments (eg, in
# rural areas) with too many nodes between true endpoints.
# handle it by just ignoring that component and letting its
# topology remain intact (this should be a rare occurrence)
# RuntimeError is what Python <3.5 will throw, Py3.5+ throws
# RecursionError but it is a subtype of RuntimeError so it
# still gets handled
log('Constructed all paths to simplify in {:,.2f} seconds'.format(time.time()-start_time))
return paths_to_simplify
else:
# if it's not a list, convert it to the node_type
data['osmid'] = node_type(data['osmid'])
# if geometry attribute exists, load the string as well-known text to
# shapely LineString
if 'geometry' in data:
data['geometry'] = wkt.loads(data['geometry'])
# remove node_default and edge_default metadata keys if they exist
if 'node_default' in G.graph:
del G.graph['node_default']
if 'edge_default' in G.graph:
del G.graph['edge_default']
log('Loaded graph with {:,} nodes and {:,} edges in {:,.2f} seconds from "{}"'.format(len(list(G.nodes())),
len(list(G.edges())),
time.time()-start_time,
path))
return G
# if buffer_dist was passed in, project the geometry to UTM, buffer it
# in meters, then project it back to lat-long
if buffer_dist is not None:
gdf_utm = project_gdf(gdf)
gdf_utm['geometry'] = gdf_utm['geometry'].buffer(buffer_dist)
gdf = project_gdf(gdf_utm, to_latlong=True)
log('Buffered the GeoDataFrame "{}" to {} meters'.format(gdf.gdf_name, buffer_dist))
# return the gdf
log('Created GeoDataFrame with {} row for query "{}"'.format(len(gdf), query))
return gdf
else:
# if there was no data returned (or fewer results than which_result
# specified)
log('OSM returned no results (or fewer than which_result) for query "{}"'.format(query), level=lg.WARNING)
gdf = gpd.GeoDataFrame()
gdf.gdf_name = gdf_name
return gdf
retain_all : bool
if True, return the entire graph even if it is not connected
Returns
-------
networkx multidigraph
"""
# get the shortest distance between the node and every other node, then
# remove every node further than max_distance away
start_time = time.time()
G = G.copy()
distances = nx.shortest_path_length(G, source=source_node, weight=weight)
distant_nodes = {key:value for key, value in dict(distances).items() if value > max_distance}
G.remove_nodes_from(distant_nodes.keys())
log('Truncated graph by weighted network distance in {:,.2f} seconds'.format(time.time()-start_time))
# remove any isolated nodes and retain only the largest component (if
# retain_all is True)
if not retain_all:
G = remove_isolated_nodes(G)
G = get_largest_component(G)
return G
if file_format == 'svg':
# if the file_format is svg, prep the fig/ax a bit for saving
ax.axis('off')
ax.set_position([0, 0, 1, 1])
ax.patch.set_alpha(0.)
fig.patch.set_alpha(0.)
fig.savefig(path_filename, bbox_inches=0, format=file_format, facecolor=fig.get_facecolor(), transparent=True)
else:
if axis_off:
# if axis is turned off, constrain the saved figure's extent to
# the interior of the axis
extent = ax.get_window_extent().transformed(fig.dpi_scale_trans.inverted())
else:
extent = 'tight'
fig.savefig(path_filename, dpi=dpi, bbox_inches=extent, format=file_format, facecolor=fig.get_facecolor(), transparent=True)
log('Saved the figure to disk in {:,.2f} seconds'.format(time.time()-start_time))
# show the figure if specified
if show:
start_time = time.time()
plt.show()
log('Showed the plot in {:,.2f} seconds'.format(time.time()-start_time))
# if show=False, close the figure if close=True to prevent display
elif close:
plt.close()
return fig, ax
min_num : int
passed on to intersect_index_quadrats: the minimum number of linear
quadrat lines (e.g., min_num=3 would produce a quadrat grid of 4
squares)
buffer_amount : numeric
passed on to intersect_index_quadrats: buffer the quadrat grid lines by
quadrat_width times buffer_amount
Returns
-------
networkx multidigraph
"""
start_time = time.time()
G = G.copy()
log('Identifying all nodes that lie outside the polygon...')
# get a GeoDataFrame of all the nodes, for spatial analysis
node_geom = [Point(data['x'], data['y']) for _, data in G.nodes(data=True)]
gdf_nodes = gpd.GeoDataFrame({'node':pd.Series(G.nodes()), 'geometry':node_geom})
gdf_nodes.crs = G.graph['crs']
# find all the nodes in the graph that lie outside the polygon
points_within_geometry = intersect_index_quadrats(gdf_nodes, polygon, quadrat_width=quadrat_width, min_num=min_num, buffer_amount=buffer_amount)
nodes_outside_polygon = gdf_nodes[~gdf_nodes.index.isin(points_within_geometry.index)]
# now remove from the graph all those nodes that lie outside the place
# polygon
start_time = time.time()
G.remove_nodes_from(nodes_outside_polygon['node'])
log('Removed {:,} nodes outside polygon in {:,.2f} seconds'.format(len(nodes_outside_polygon), time.time()-start_time))
the color of the edges' lines
edge_linewidth : float
the width of the edges' lines
edge_alpha : float
the opacity of the edges' lines
use_geom : bool
if True, use the spatial geometry attribute of the edges to draw
geographically accurate edges, rather than just lines straight from node
to node
Returns
-------
fig, ax : tuple
"""
log('Begin plotting the graph...')
node_Xs = [float(x) for _, x in G.nodes(data='x')]
node_Ys = [float(y) for _, y in G.nodes(data='y')]
# get north, south, east, west values either from bbox parameter or from the
# spatial extent of the edges' geometries
if bbox is None:
edges = graph_to_gdfs(G, nodes=False, fill_edge_geometry=True)
west, south, east, north = edges.total_bounds
else:
north, south, east, west = bbox
# if caller did not pass in a fig_width, calculate it proportionately from
# the fig_height and bounding box aspect ratio
bbox_aspect_ratio = (north-south)/(east-west)
if fig_width is None:
fig_width = fig_height / bbox_aspect_ratio
if fill_edge_geometry:
point_u = Point((G.nodes[u]['x'], G.nodes[u]['y']))
point_v = Point((G.nodes[v]['x'], G.nodes[v]['y']))
edge_details['geometry'] = LineString([point_u, point_v])
else:
edge_details['geometry'] = np.nan
edges.append(edge_details)
# create a GeoDataFrame from the list of edges and set the CRS
gdf_edges = gpd.GeoDataFrame(edges)
gdf_edges.crs = G.graph['crs']
gdf_edges.gdf_name = '{}_edges'.format(G.graph['name'])
to_return.append(gdf_edges)
log('Created GeoDataFrame "{}" from graph in {:,.2f} seconds'.format(gdf_edges.gdf_name, time.time()-start_time))
if len(to_return) > 1:
return tuple(to_return)
else:
return to_return[0]
# check if this request is already in the cache (if global use_cache=True)
cached_response_json = get_from_cache(url)
if cached_response_json is not None:
response_json = cached_response_json
else:
try:
# request the elevations from the API
log('Requesting node elevations: {}'.format(url))
time.sleep(pause_duration)
response = requests.get(url)
response_json = response.json()
save_to_cache(url, response_json)
except Exception as e:
log(e)
log('Server responded with {}: {}'.format(response.status_code, response.reason))
# append these elevation results to the list of all results
results.extend(response_json['results'])
# sanity check that all our vectors have the same number of elements
if not (len(results) == len(G.nodes()) == len(node_points)):
raise Exception('Graph has {} nodes but we received {} results from the elevation API.'.format(len(G.nodes()), len(results)))
else:
log('Graph has {} nodes and we received {} results from the elevation API.'.format(len(G.nodes()), len(results)))
# add elevation as an attribute to the nodes
df = pd.DataFrame(node_points, columns=['node_points'])
df['elevation'] = [result['elevation'] for result in results]
df['elevation'] = df['elevation'].round(3) # round to millimeter
nx.set_node_attributes(G, name='elevation', values=df['elevation'].to_dict())
log('Added elevation data to all nodes.')