Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_nmg_no_cell_area():
y_of_node = (0, 1, 2, 2)
x_of_node = (0, 0, -1, 1)
nodes_at_link = ((1, 0), (2, 1), (3, 1))
nmg = NetworkModelGrid((y_of_node, x_of_node), nodes_at_link)
nmg.add_field("topographic__elevation", nmg.x_of_node + nmg.y_of_node, at="node")
with pytest.raises(FieldError):
FlowAccumulator(nmg)
def test_check_fields(sink_grid1):
"""
Check to make sure the right fields have been created.
"""
SinkFillerBarnes(sink_grid1)
assert_array_equal(
np.zeros(sink_grid1.number_of_nodes), sink_grid1.at_node["sediment_fill__depth"]
)
with pytest.raises(FieldError):
sink_grid1.at_node["drainage_area"]
def test_missing_fields():
mg = RasterModelGrid((10, 10))
mg.add_zeros("node", "topographic__elevation")
with pytest.raises(FieldError):
DrainageDensity(
mg,
area_coefficient=1,
slope_coefficient=1,
area_exponent=1,
slope_exponent=1,
channelization_threshold=1,
)
def test_no_upstream_array():
"""Test that correct error is raised when no flow__upstream_node_order."""
mg = RasterModelGrid(30, 70)
mg.add_ones("node", "topographic__elevation")
mg.add_ones("node", "drainage_area")
fd = FlowDirectorSteepest(mg)
fd.run_one_step()
with pytest.raises(FieldError):
calculate_distance_to_divide(mg)
if channelization_threshold is not None:
warn('Channel mask and channelization '
'threshold supplied. Defaulting '
'to channel mask, ignoring '
'threshold.')
if grid.number_of_nodes != len(channel__mask):
raise ValueError('Length of channel mask is not equal to '
'number of grid nodes')
if 'channel__mask' in grid.at_node:
warn("Existing channel__mask grid field was overwritten.")
grid.at_node['channel__mask'] = channel__mask
if channel__mask is None:
if area_coefficient is None:
raise FieldError('No channel mask and no area '
'coefficient supplied. Either '
'a channel mask or all 5 threshold '
'parameters are needed.')
if slope_coefficient is None:
raise FieldError('No channel mask and no slope '
'coefficient supplied. Either '
'a channel mask or all 5 threshold '
'parameters are needed.')
if area_exponent is None:
raise FieldError('No channel mask and no area '
'exponent supplied. Either '
'a channel mask or all 5 threshold '
'parameters are needed.')
if slope_exponent is None:
raise FieldError('No channel mask and no slope '
'exponent supplied. Either '
raise FieldError('No channel mask and no area '
'coefficient supplied. Either '
'a channel mask or all 5 threshold '
'parameters are needed.')
if slope_coefficient is None:
raise FieldError('No channel mask and no slope '
'coefficient supplied. Either '
'a channel mask or all 5 threshold '
'parameters are needed.')
if area_exponent is None:
raise FieldError('No channel mask and no area '
'exponent supplied. Either '
'a channel mask or all 5 threshold '
'parameters are needed.')
if slope_exponent is None:
raise FieldError('No channel mask and no slope '
'exponent supplied. Either '
'a channel mask or all 5 threshold '
'parameters are needed.')
if channelization_threshold is None:
raise FieldError('No channel mask and no channelization '
'threshold supplied. Either '
'a channel mask or all 5 threshold '
'parameters are needed.')
channel__mask = (area_coefficient * \
np.power(grid.at_node['drainage_area'], area_exponent) \
* slope_coefficient * \
np.power(grid.at_node['topographic__steepest_slope'], \
slope_exponent)) > channelization_threshold
grid.at_node['channel__mask'] = channel__mask
required = ('flow__receiver_node', 'flow__link_to_receiver_node',
self._lake_map.fill(self._grid.BAD_INDEX)
self._depression_outlet_map.fill(self._grid.BAD_INDEX)
self._depression_depth.fill(0.0)
self._depression_outlets = [] # reset these
# Locate nodes with pits
if isinstance(self._user_supplied_pits, str):
try:
pits = self._grid.at_node[self._user_supplied_pits]
supplied_pits = np.where(pits)[0]
self._pit_node_ids = as_id_array(
np.setdiff1d(supplied_pits, self._grid.boundary_nodes)
)
self._number_of_pits = self._pit_node_ids.size
self._is_pit.fill(False)
self._is_pit[self._pit_node_ids] = True
except FieldError:
self._find_pits()
elif self._user_supplied_pits is None:
self._find_pits()
else: # hopefully an array or other sensible iterable
if len(self._user_supplied_pits) == self._grid.number_of_nodes:
supplied_pits = np.where(self._user_supplied_pits)[0]
else: # it's an array of node ids
supplied_pits = self._user_supplied_pits
# remove any boundary nodes from the supplied pit list
self._pit_node_ids = as_id_array(
np.setdiff1d(supplied_pits, self._grid.boundary_nodes)
)
self._number_of_pits = self._pit_node_ids.size
self._is_pit.fill(False)
self._is_pit[self._pit_node_ids] = True
if not self._use_diags:
g = self._grid.zeros(at="link")
qs = self._grid.zeros(at="link")
try:
self._g = self._grid.add_field(
"topographic__gradient", g, at="link", clobber=False
)
# ^note this will object if this exists already
except FieldError: # keep a ref
self._g = self._grid.at_link["topographic__gradient"]
try:
self._qs = self._grid.add_field(
"hillslope_sediment__unit_volume_flux", qs, at="link",
clobber=False
)
except FieldError:
self._qs = (
self._grid.at_link["hillslope_sediment__unit_volume_flux"]
)
# note all these terms are deliberately loose, as we won't always
# be dealing with topo
else:
g = np.zeros(self._grid.number_of_d8, dtype=float)
qs = np.zeros(self._grid.number_of_d8, dtype=float)
self._g = g
self._qs = qs
# now we have to choose what the face width of a diagonal is...
# Adopt a regular octagon config if it's a square raster, and
# stretch this geometry as needed.
# Conceptually, this means we're passing mass across diamond-
# shaped holes centered at the corners.
# Note that this WON'T affect the inferred cell size - that's
following Chow, 1959. (s m^(-1./3.))
index_flow_depth : float
The flow depth above which it is assumed that Manning's n is
constant. (m)
veg_drag_exponent :
An exponent related to vegetation drag effects, which increases
effective Manning's n at low flow conditions.
"""
## Looks for a field called 'mannings_n' attached to the grid instance. If
## one is found, a FieldError is thrown but ignored. This method
## REWRITES over the existing Manning's n fields after the calculation.
try:
grid.add_zeros('mannings_n', at='node')
except FieldError:
pass
## Identifies locations where water depth is lower than the value supplied
## through keyword index_flow_depth.
(locs_less, ) = np.where(grid.at_node['surface_water__depth'] <=
index_flow_depth)
## Identifies locations where water depth is greater than the value
## supplied through keyword index_flow_depth.
(locs_more, ) = np.where(grid.at_node['surface_water__depth'] >
index_flow_depth)
## At all locations lower than the index flow depth (assumed to be shallow
## flow on hillslopes), a new Manning's n value is calculated to that
## incorporates effects of vegetation drag. These Manning's n values will
## be greater than the supplied Manning's n (keyword: min_mannings_n)
"""Test inputs for runoff_rate and water__unit_flux_in."""
# testing input for runoff rate, can be None, a string associated with
# a field at node, a single float or int, or an array of size number of
# nodes.
if runoff_rate is not None:
if type(runoff_rate) is str:
runoff_rate = grid.at_node[runoff_rate]
elif type(runoff_rate) in (float, int):
pass
else:
assert runoff_rate.size == grid.number_of_nodes
# test for water__unit_flux_in
try:
grid.at_node['water__unit_flux_in']
except FieldError:
if runoff_rate is None:
# assume that if runoff rate is not supplied, that the value
# should be set to one everywhere.
grid.add_ones('node', 'water__unit_flux_in', dtype=float)
else:
if type(runoff_rate) in (float, int):
grid.add_empty('node', 'water__unit_flux_in', dtype=float)
grid.at_node['water__unit_flux_in'].fill(runoff_rate)
else:
grid.at_node['water__unit_flux_in'] = runoff_rate
else:
if runoff_rate is not None:
print ("FlowAccumulator found both the field " +
"'water__unit_flux_in' and a provided float or " +
"array for the runoff_rate argument. THE FIELD IS " +
"BEING OVERWRITTEN WITH THE SUPPLIED RUNOFF_RATE!")