Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def kl_test():
import os
import numpy as np
import pandas as pd
import pyemu
try:
import flopy
except:
print("flopy not imported...")
return
model_ws = os.path.join("..","verification","Freyberg","extra_crispy")
ml = flopy.modflow.Modflow.load("freyberg.nam",model_ws=model_ws)
str_file = os.path.join("..","verification","Freyberg","structure.dat")
arr_dict = {"test":np.ones((ml.nrow,ml.ncol))}
arr_dict["hk_tru"] = np.loadtxt(os.path.join("..","verification",
"Freyberg","extra_crispy",
"hk.truth.ref"))
basis_file = os.path.join("utils","basis.dat")
tpl_file = os.path.join("utils","test.tpl")
back_dict = pyemu.utils.helpers.kl_setup(num_eig=800,sr=ml.sr,
struct_file =str_file,
array_dict=arr_dict,
basis_file=basis_file,
tpl_file=tpl_file)
for name in back_dict.keys():
diff = np.abs((arr_dict[name] - back_dict[name])).sum()
print(diff)
assert np.abs(diff) < 1.0e-2
def pcg_fmt_test():
# mf2k container - this will pass
m2k = fp.modflow.Modflow(version='mf2k')
m2k.pcg = fp.modflow.ModflowPcg.load(model=m2k,
f=pcg_fname)
# mf2005 container
m05 = fp.modflow.Modflow(version='mf2005')
m05.pcg = fp.modflow.ModflowPcg.load(model=m05,
f=pcg_fname)
# this will exit with ValueError without the except block added in pull req
assert m2k.pcg.rclose == m05.pcg.rclose
assert m2k.pcg.damp == m05.pcg.damp
return
def test_util2d_external_free():
model_ws = os.path.join(out_dir, "extra_temp")
if os.path.exists(model_ws):
shutil.rmtree(model_ws)
os.mkdir(model_ws)
ml = flopy.modflow.Modflow(model_ws=model_ws)
stress_util2d(ml, 1, 1, 1)
stress_util2d(ml, 10, 1, 1)
stress_util2d(ml, 1, 10, 1)
stress_util2d(ml, 1, 1, 10)
stress_util2d(ml, 10, 10, 1)
stress_util2d(ml, 1, 10, 10)
stress_util2d(ml, 10, 1, 10)
stress_util2d(ml, 10, 10, 10)
nlay, nrow, ncol = 3, 5, 5
perlen = np.zeros((10), dtype=np.float) + 10
nper = len(perlen)
ibound = np.ones((nlay,nrow,ncol), dtype=np.int)
botm = np.arange(-1,-4,-1)
top = 0.
# creating MODFLOW model
model_ws = os.path.join('.', 'temp', 't068a')
modelname = 'model_mf'
mf = flopy.modflow.Modflow(modelname, model_ws=model_ws,
exe_name=mf_exe_name)
dis = flopy.modflow.ModflowDis(mf, nlay=nlay, nrow=nrow, ncol=ncol,
perlen=perlen, nper=nper, botm=botm, top=top,
steady=False)
bas = flopy.modflow.ModflowBas(mf, ibound=ibound, strt=top)
lpf = flopy.modflow.ModflowLpf(mf, hk=100, vka=100, ss=0.00001, sy=0.1)
oc = flopy.modflow.ModflowOc(mf)
pcg = flopy.modflow.ModflowPcg(mf)
lmt = flopy.modflow.ModflowLmt(mf)
# recharge
rchrate = {}
rchrate[0] = 0.0
rchrate[5] = 0.001
rchrate[6] = 0.0
def setup_freyberg_transient_model():
org_model_ws = os.path.join("..", "examples", "freyberg_sfr_update")
nam_file = "freyberg.nam"
mo = flopy.modflow.Modflow.load(nam_file, model_ws=org_model_ws, check=False,forgive=False)
perlen = np.ones((365))
m = flopy.modflow.Modflow("freyberg_transient",model_ws=os.path.join("da","freyberg","truth"),version="mfnwt",
external_path=".")
flopy.modflow.ModflowDis(m,nrow=mo.nrow,ncol=mo.ncol,nlay=1,delr=mo.dis.delr,
delc=mo.dis.delc,top=mo.dis.top,botm=mo.dis.botm[-1],nper=len(perlen),perlen=perlen)
flopy.modflow.ModflowBas(m,ibound=mo.bas6.ibound[0],strt=mo.bas6.strt[0])
flopy.modflow.ModflowUpw(m,laytyp=mo.upw.laytyp,hk=mo.upw.hk[0],vka=mo.upw.vka[0],ss=0.00001,sy=0.01)
flopy.modflow.ModflowNwt(m)
oc_data = {}
for iper in range(m.nper):
oc_data[iper,0] = ["save head","save budget"]
flopy.modflow.ModflowOc(m,stress_period_data=oc_data)
flopy.modflow.ModflowRch(m,rech=mo.rch.rech.array[0])
wel_data = mo.wel.stress_period_data[0]
wel_data["k"][:] = 0
flopy.modflow.ModflowWel(m,stress_period_data={0:wel_data})
flopy.modflow.ModflowSfr2(m,nstrm=mo.sfr.nstrm,nss=mo.sfr.nss,istcb2=90,segment_data=mo.sfr.segment_data,reach_data=mo.sfr.reach_data)
def test_none_spdtype():
# ensure that -1 and None work as valid list entries in the
# stress period dictionary
model_ws = os.path.join('.', 'temp', 't068c')
mf = flopy.modflow.Modflow(model_ws=model_ws, exe_name=mf_exe_name)
dis = flopy.modflow.ModflowDis(mf, nper=2)
bas = flopy.modflow.ModflowBas(mf)
lpf = flopy.modflow.ModflowLpf(mf)
spd = {0: -1, 1: None}
wel = flopy.modflow.ModflowWel(mf, stress_period_data=spd)
pcg = flopy.modflow.ModflowPcg(mf)
mf.write_input()
mf2 = flopy.modflow.Modflow.load('modflowtest.nam', model_ws=model_ws,
verbose=True)
if run:
success, buff = mf.run_model(report=True)
assert success
def fieldgen_dev():
import shutil
import numpy as np
import pandas as pd
try:
import flopy
except:
return
import pyemu
org_model_ws = os.path.join("..", "examples", "freyberg_sfr_update")
nam_file = "freyberg.nam"
m = flopy.modflow.Modflow.load(nam_file, model_ws=org_model_ws, check=False)
flopy.modflow.ModflowRiv(m, stress_period_data={0: [[0, 0, 0, 30.0, 1.0, 25.0],
[0, 0, 1, 31.0, 1.0, 25.0],
[0, 0, 1, 31.0, 1.0, 25.0]]})
org_model_ws = "temp"
m.change_model_ws(org_model_ws)
m.write_input()
new_model_ws = "temp_fieldgen"
ph = pyemu.helpers.PstFromFlopyModel(nam_file, new_model_ws=new_model_ws,
org_model_ws=org_model_ws,
grid_props=[["upw.hk", 0], ["rch.rech", 0]],
remove_existing=True,build_prior=False)
v = pyemu.geostats.ExpVario(1.0,1000,anisotropy=10,bearing=45)
gs = pyemu.geostats.GeoStruct(nugget=0.0,variograms=v,name="aniso")
struct_dict = {gs:["hk","ss"]}
-------
``>>>import flopy``
``>>>from pyemu.utils import setup_pilotpoints_grid``
``>>>m = flopy.modflow.Modfow.load("mymodel.nam")``
``>>>setup_pilotpoints_grid(m,prefix_dict={0:['hk_'],1:['vka_']},``
``>>> every_n_cell=3,shapename='layer1_pp.shp')``
"""
import flopy
if ml is not None:
assert isinstance(ml,flopy.modflow.Modflow)
sr = ml.sr
if ibound is None:
ibound = ml.bas6.ibound.array
else:
assert sr is not None,"if 'ml' is not passed, 'sr' must be passed"
if ibound is None:
print("ibound not passed, using array of ones")
ibound = {k:np.ones((sr.nrow,sr.ncol)) for k in prefix_dict.keys()}
#assert ibound is not None,"if 'ml' is not pass, 'ibound' must be passed"
if isinstance(ibound, np.ndarray):
assert np.ndim(ibound) == 2 or np.ndim(ibound) == 3, \
"ibound needs to be either 3d np.ndarry or k_dict of 2d arrays. " \
"Array of {0} dimensions passed".format(np.ndim(ibound))
if np.ndim(ibound) == 2:
ibound = {0: ibound}
kstpkper = (kstpkper[0] - 1, kstpkper[1] - 1)
kper = kstpkper[1]
# if we haven't visited this kper yet
if kper != iper:
arr = cbf.get_data(kstpkper=kstpkper, text=text, full3D=True)
if len(arr) > 0:
arr = arr[0]
print(arr.max(), arr.min(), arr.sum())
# masked where zero
arr[np.where(arr == 0.0)] = np.NaN
m4d[iper + 1] = arr
iper += 1
# model wasn't passed, then create a generic model
if model is None:
model = Modflow("test")
# if model doesn't have a wel package, then make a generic one...
# need this for the from_m4d method
if model.wel is None:
ModflowWel(model)
# get the stress_period_data dict {kper:np recarray}
sp_data = MfList.from_4d(model, "WEL", {"flux": m4d})
wel = ModflowWel(model, stress_period_data=sp_data)
return wel
# Set attributes
self.version_types = {'seawat': 'SEAWAT'}
self.set_version(version)
self.lst = SeawatList(self, listunit=listunit)
self.glo = None
self._mf = None
self._mt = None
# If a MODFLOW model was passed in, then add its packages
self.mf = self
if modflowmodel is not None:
for p in modflowmodel.packagelist:
self.packagelist.append(p)
self._modelgrid = modflowmodel.modelgrid
else:
modflowmodel = Modflow()
# If a MT3D model was passed in, then add its packages
if mt3dmodel is not None:
for p in mt3dmodel.packagelist:
self.packagelist.append(p)
else:
mt3dmodel = Mt3dms()
# external option stuff
self.array_free_format = False
self.array_format = 'mt3d'
self.external_fnames = []
self.external_units = []
self.external_binflag = []
self.external = False
self.load = load