Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def load_model(data):
# Create model
oseries = ps.TimeSeries(**data["oseries"])
if "constant" in data.keys():
constant = data["constant"]
else:
constant = False
if "metadata" in data.keys():
metadata = data["metadata"]
else:
metadata = None
if "name" in data.keys():
name = data["name"]
else:
name = None
This is the time at which precipitation is logged in dutch KNMI-stations.
"""
import pastas as ps
import pandas as pd
# read observations
obs = ps.read_dino('data/B58C0698001_1.csv')
# Create the time series model
ml = ps.Model(obs)
# read weather data
knmi = ps.read.knmi.KnmiStation.fromfile(
'data/neerslaggeg_HEIBLOEM-L_967-2.txt')
rain = ps.TimeSeries(knmi.data['RD'], settings='prec')
evap = ps.read_knmi('data/etmgeg_380.txt', variables='EV24')
if True:
# also add 9 hours to the evaporation
s = evap.series_original
s.index = s.index + pd.to_timedelta(9, 'h')
evap.series_original = s
# Create stress
sm = ps.StressModel2(stress=[rain, evap], rfunc=ps.Exponential,
name='recharge')
ml.add_stressmodel(sm)
# set the time-offset of the model. This should be done automatically in the future.
ml._set_time_offset()
metadata=metadata)
if "settings" in data.keys():
ml.settings.update(data["settings"])
if "file_info" in data.keys():
ml.file_info.update(data["file_info"])
# Add stressmodels
for name, ts in data["stressmodels"].items():
stressmodel = getattr(ps.stressmodels, ts["stressmodel"])
ts.pop("stressmodel")
if "rfunc" in ts.keys():
ts["rfunc"] = getattr(ps.rfunc, ts["rfunc"])
if "stress" in ts.keys():
for i, stress in enumerate(ts["stress"]):
ts["stress"][i] = ps.TimeSeries(**stress)
if "prec" in ts.keys():
ts["prec"] = ps.TimeSeries(**ts["prec"])
if "evap" in ts.keys():
ts["evap"] = ps.TimeSeries(**ts["evap"])
if "temp" in ts.keys() and ts["temp"] is not None:
ts["temp"] = ps.TimeSeries(**ts["temp"])
stressmodel = stressmodel(**ts)
ml.add_stressmodel(stressmodel)
# Add transform
if "transform" in data.keys():
transform = getattr(ps.transform, data["transform"]["transform"])
data["transform"].pop("transform")
transform = transform(**data["transform"])
ml.add_transform(transform)
# Add stressmodels
for name, ts in data["stressmodels"].items():
stressmodel = getattr(ps.stressmodels, ts["stressmodel"])
ts.pop("stressmodel")
if "rfunc" in ts.keys():
ts["rfunc"] = getattr(ps.rfunc, ts["rfunc"])
if "stress" in ts.keys():
for i, stress in enumerate(ts["stress"]):
ts["stress"][i] = ps.TimeSeries(**stress)
if "prec" in ts.keys():
ts["prec"] = ps.TimeSeries(**ts["prec"])
if "evap" in ts.keys():
ts["evap"] = ps.TimeSeries(**ts["evap"])
if "temp" in ts.keys() and ts["temp"] is not None:
ts["temp"] = ps.TimeSeries(**ts["temp"])
stressmodel = stressmodel(**ts)
ml.add_stressmodel(stressmodel)
# Add transform
if "transform" in data.keys():
transform = getattr(ps.transform, data["transform"]["transform"])
data["transform"].pop("transform")
transform = transform(**data["transform"])
ml.add_transform(transform)
# Add noisemodel if present
if "noisemodel" in data.keys():
n = getattr(ps.noisemodels, data["noisemodel"]["type"])()
ml.add_noisemodel(n)
# Add fit object to the model
if "file_info" in data.keys():
ml.file_info.update(data["file_info"])
# Add stressmodels
for name, ts in data["stressmodels"].items():
stressmodel = getattr(ps.stressmodels, ts["stressmodel"])
ts.pop("stressmodel")
if "rfunc" in ts.keys():
ts["rfunc"] = getattr(ps.rfunc, ts["rfunc"])
if "stress" in ts.keys():
for i, stress in enumerate(ts["stress"]):
ts["stress"][i] = ps.TimeSeries(**stress)
if "prec" in ts.keys():
ts["prec"] = ps.TimeSeries(**ts["prec"])
if "evap" in ts.keys():
ts["evap"] = ps.TimeSeries(**ts["evap"])
if "temp" in ts.keys() and ts["temp"] is not None:
ts["temp"] = ps.TimeSeries(**ts["temp"])
stressmodel = stressmodel(**ts)
ml.add_stressmodel(stressmodel)
# Add transform
if "transform" in data.keys():
transform = getattr(ps.transform, data["transform"]["transform"])
data["transform"].pop("transform")
transform = transform(**data["transform"])
ml.add_transform(transform)
# Add noisemodel if present
if "noisemodel" in data.keys():
n = getattr(ps.noisemodels, data["noisemodel"]["type"])()
ml.add_noisemodel(n)
IN.name = 'Precipitation'
IN2 = meny.IN['Evaporation']['values']
IN2.index = IN2.index.round("D")
IN2.name = 'Evaporation'
sm = ps.StressModel2([IN, IN2], ps.Gamma, 'Recharge')
ml.add_stressmodel(sm)
#Add well extraction 1
# IN = meny.IN['Extraction 1']
# well = ps.TimeSeries(IN["values"], freq_original="M", settings="well")
# # extraction amount counts for the previous month
# sm = ps.StressModel(well, ps.Hantush, 'Extraction_1', up=False)
# Add well extraction 2
IN = meny.IN['Extraction 2']
well = ps.TimeSeries(IN["values"], freq_original="M", settings="well")
# extraction amount counts for the previous month
sm1 = ps.StressModel(well, ps.Hantush, 'Extraction_2', up=False)
# Add well extraction 3
IN = meny.IN['Extraction 3']
well = ps.TimeSeries(IN["values"], freq_original="M", settings="well")
# extraction amount counts for the previous month
sm2 = ps.StressModel(well, ps.Hantush, 'Extraction_3', up=False)
# add_stressmodels also allows addings multiple stressmodels at once
ml.add_stressmodel(sm, sm1, sm2)
# Solve
ml.solve(tmax="1995")
ax = ml.plots.decomposition(ytick_base=1.)