Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_sp_new():
"""
Tests new style component instantiation and run.
"""
input_str = os.path.join(_THIS_DIR, "drive_sp_params.txt")
inputs = ModelParameterDictionary(input_str, auto_type=True)
nrows = inputs.read_int("nrows")
ncols = inputs.read_int("ncols")
dx = inputs.read_float("dx")
dt = inputs.read_float("dt")
time_to_run = inputs.read_float("run_time")
uplift = inputs.read_float("uplift_rate")
init_elev = inputs.read_float("init_elev")
mg = RasterModelGrid((nrows, ncols), xy_spacing=(dx, dx))
mg.set_closed_boundaries_at_grid_edges(False, False, True, True)
mg.add_zeros("topographic__elevation", at="node")
z = mg.zeros(at="node") + init_elev
numpy.random.seed(0)
mg["node"]["topographic__elevation"] = z + numpy.random.rand(len(z)) / 1000.0
from __future__ import print_function
from landlab.components.craters import impactor
from landlab import ModelParameterDictionary
from landlab import RasterModelGrid
import numpy as np
import time
import pylab
#get the needed properties to build the grid:
input_file = './craters_params.txt'
inputs = ModelParameterDictionary(input_file)
nt = inputs.read_int('number_of_craters_per_loop')
loops = inputs.read_int('number_of_loops')
#the grid should already exist in the environment
#instantiate the component; need to do this again as old version is set to force size:
craters_component = impactor(mg, input_file)
# Display a message
print( 'Running ...' )
start_time = time.time()
#perform the loops:
x = np.empty(nt)
y = np.empty(nt)
r = np.empty(nt)
slope = np.empty(nt)
def test_sp_discharges_new():
input_str = os.path.join(_THIS_DIR, "test_sp_params_discharge_new.txt")
inputs = ModelParameterDictionary(input_str, auto_type=True)
nrows = 5
ncols = 5
dx = inputs.read_float("dx")
dt = inputs.read_float("dt")
mg = RasterModelGrid((nrows, ncols), xy_spacing=dx)
mg.add_zeros("topographic__elevation", at="node")
z = np.array(
[
5.0,
5.0,
0.0,
5.0,
5.0,
5.0,
2.0,
def initialize( self, shape ):
""" Read Input File """
MPD = ModelParameterDictionary()
MPD.read_from_file(_DEFAULT_INPUT_FILE)
""" Read Input Parameters """
self._iterate_storm = MPD.read_int( 'N_STORMS' )
self._vegcover = MPD.read_float( 'VEG_COV' )
self._interception_cap = MPD.read_float( 'INTERCEPT_CAP' )
self._zr = MPD.read_float( 'ZR' )
self._runon = MPD.read_float( 'RUNON' )
self._fbare = MPD.read_float( 'F_BARE' )
self._soil_Ib = MPD.read_float( 'I_B' )
self._soil_Iv = MPD.read_float( 'I_V' )
self._soil_Ew = MPD.read_float( 'EW' )
self._soil_pc = MPD.read_float( 'PC' )
self._soil_fc = MPD.read_float( 'FC' )
self._soil_sc = MPD.read_float( 'SC' )
self._soil_wp = MPD.read_float( 'WP' )
... light_attenuation_length: length scale for light attenuation, cells (1 cell = 1 mm)
... 2.0
... ''')
>>> tsbm = TurbulentSuspensionAndBleachingModel(p)
>>> tsbm.node_state
array([1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0])
>>> tsbm.grid.at_node['osl']
array([ 1., 1., 1., 1., 1., 1., 1., 1., 0., 0., 0., 0., 0.,
0., 0., 0.])
>>> tsbm.n_xn
array([0, 1, 1, 0, 0, 1, 1, 0])
>>> tsbm.fluid_surface_height
3.5
"""
# Get a source for input parameters.
params = ModelParameterDictionary(input_stream)
# Read user-defined parameters
nr = params.read_int('model_grid_row__count') # number of rows (CSDMS Standard Name [CSN])
nc = params.read_int('model_grid_column__count') # number of cols (CSN)
self.plot_interval = params.read_float('plot_interval') # interval for plotting output, s
self.run_duration = params.read_float('model__run_time') # duration of run, sec (CSN)
self.report_interval = params.read_float('model__report_interval') # report interval, in real-time seconds
self.bleach_T0 = params.read_float('surface_bleaching_time_scale') # time scale for bleaching at fluid surface, s
self.zstar = params.read_float('light_attenuation_length') # length scale for light attenuation in fluid, CELLS
# Derived parameters
self.fluid_surface_height = nr-0.5
# Calculate when we next want to report progress.
self.next_report = time.time() + self.report_interval
which is calculated as
Qc = 8.*C_MPM*(taustar - taustarcrit)**1.5
In almost all cases, tuning depth_equation_prefactor' is
preferred to tuning this parameter.
*return_capacity -> bool (default False). NOT YET IMPLEMENTED.
If True, this component
will save the calculated capacity in the field
'fluvial_sediment_transport_capacity'. (Requires some additional
math, so is suppressed for speed by default).
'''
self.grid = grid
self.link_S_with_trailing_blank = np.zeros(grid.number_of_links+1) #needs to be filled with values in execution
self.count_active_links = np.zeros_like(self.link_S_with_trailing_blank, dtype=int)
self.count_active_links[:-1] = 1
inputs = ModelParameterDictionary(params_file)
try:
self.g = inputs.read_float('g')
except MissingKeyError:
self.g = 9.81
try:
self.rock_density = inputs.read_float('rock_density')
except MissingKeyError:
self.rock_density = 2700.
try:
self.sed_density = inputs.read_float('sediment_density')
except MissingKeyError:
self.sed_density = 2700.
try:
self.fluid_density = inputs.read_float('fluid_density')
except MissingKeyError:
self.fluid_density = 1000.
@author: danhobley
"""
from __future__ import print_function
from six.moves import range
from landlab import RasterModelGrid, ModelParameterDictionary
from landlab.plot.imshow import imshow_node_grid
import numpy as np
from pylab import imshow, show, contour, figure, clabel, quiver, plot, close
from landlab.components.potentiality_flowrouting import PotentialityFlowRouter
from landlab.components.flow_routing import FlowRouter
from landlab.components.stream_power import FastscapeEroder
# from landlab.grid.mappers import map_link_end_node_max_value_to_link
inputs = ModelParameterDictionary('./pot_fr_params.txt')
nrows = 50#inputs.read_int('nrows')
ncols = 50#inputs.read_int('ncols')
dx = inputs.read_float('dx')
init_elev = inputs.read_float('init_elev')
mg = RasterModelGrid(nrows, ncols, dx)
# attempt to implement diffusion with flow routing...
#modify the fields in the grid
z = mg.zeros(at='node') + init_elev
mg.at_node['topographic__elevation'] = z + np.random.rand(len(z))/1000.
mg.add_zeros('water__unit_flux_in', at='node')
mg.add_zeros('water__discharge', at='link')
#Set boundary conditions
@author: danhobley
"""
from __future__ import print_function
from six.moves import range
from landlab import RasterModelGrid, ModelParameterDictionary
from landlab.plot.imshow import imshow_node_grid
import numpy as np
from pylab import imshow, show, contour, figure, clabel, quiver, plot, close
from landlab.components.potentiality_flowrouting import PotentialityFlowRouter
from landlab.components.flow_routing import FlowRouter
from landlab.components.stream_power import FastscapeEroder
# from landlab.grid.mappers import map_link_end_node_max_value_to_link
inputs = ModelParameterDictionary('./pot_fr_params.txt')
nrows = 50#inputs.read_int('nrows')
ncols = 50#inputs.read_int('ncols')
dx = inputs.read_float('dx')
init_elev = inputs.read_float('init_elev')
mg = RasterModelGrid(nrows, ncols, dx)
# attempt to implement diffusion with flow routing...
#modify the fields in the grid
z = mg.zeros(at='node') + init_elev
z_slope = (49000. - mg.node_y)/mg.node_y.max()/20.
mg.at_node['topographic__elevation'] = z + z_slope #+ np.random.rand(len(z))/1000.
mg.add_zeros('water__unit_flux_in', at='node')
mg.add_zeros('link', 'water__volume_flux_magnitude')
from __future__ import print_function
from landlab import RasterModelGrid
from landlab import ModelParameterDictionary
from landlab.components.sed_trp_shallow_flow import SurfaceFlowTransport
import time
import pylab
import numpy as np
#get the needed properties to build the grid:
inputs = ModelParameterDictionary('./stof_params.txt')
nrows = inputs.read_int('nrows')
ncols = inputs.read_int('ncols')
dx = inputs.read_float('dx')
h_init = inputs.read_float('h_init')
h_boundary = inputs.read_float('h_boundary')
drop_ht = inputs.read_float('drop_ht')
initial_slope = inputs.read_float('initial_slope')
time_to_run = inputs.read_int('run_time')
left_middle_node = ncols*(nrows/2)
z0 = drop_ht+dx*(ncols-1)*initial_slope
mg = RasterModelGrid(nrows, ncols, dx)
mg.set_inactive_boundaries(True, False, True, False)
#create the fields in the grid
def main():
# INITIALIZE
# Name of parameter input file
input_file_name = 'test_inputs_for_diffusion_model.txt'
# Open input file and read run-control parameters
mpd = ModelParameterDictionary(input_file_name)
run_duration = mpd.get('RUN_DURATION', ptype=float)
# Create and initialize a grid
mg = create_and_initialize_grid(mpd)
# Create and initialize a diffusion component
dc = LinearDiffuser(mg)
dc.initialize(mpd)
# RUN
# Run the diffusion component until it's time for the next output
dc.run_until(run_duration)
# FINALIZE