Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from proteus.iproteus import *
from proteus import Comm
from proteus.Domain import InterpolatedBathymetryDomain
from proteus.MeshTools import InterpolatedBathymetryMesh
from proteus.Archiver import XdmfArchive
import xml.etree.ElementTree as ElementTree
comm = Comm.init()
Profiling.logLevel=7
Profiling.verbose=True
def setupStepGauss():
import numpy as np
from math import sin,cos,pi,sqrt,exp
#set up a fake LiDAR point set
nPoints_x = nPoints_y = 51
delta_x = 2.0/float(nPoints_x-1)
delta_y = 2.0/float(nPoints_y-1)
bathy = np.zeros((nPoints_x*nPoints_y,3),'d')
for i in range(nPoints_y):
for j in range(nPoints_x):
x = -0.5+j*delta_x
y = -0.5+i*delta_y
#z = 1.0
np.savetxt("deim_approx_errors_mass_test_T={0}_nDT={1}_m={2}.dat".format(T,nDTout,m_mass),errors_mass)
return errors,errors_mass,F_deim,Fm_deim
def test_deim_approx_full(tol=1.0e-12):
"""
check that get very small error if use full basis
"""
T = 0.1; nDTout=10; m=nDTout+1
errors,errors_mass,F_deim,Fm_deim = deim_approx(T=T,nDTout=nDTout,m=m,m_mass=m)
assert errors.min() < tol
assert errors_mass.min() < tol
if __name__ == "__main__":
from proteus import Comm
comm = Comm.init()
import nose
nose.main(defaultTest='test_deim:test_deim_approx_full')
def __init__(self,so,pList,nList,sList,opts,simFlagsList=None,TwoPhaseFlow=False):
from . import Comm
comm=Comm.get()
self.comm=comm
message = "Initializing NumericalSolution for "+so.name+"\n System includes: \n"
for p in pList:
message += p.name+"\n"
logEvent(message)
#: SplitOperator initialize file
self.so=so
#: List of physics initialize files
self.pList=pList
#: List of numerics initialize files
self.nList=nList
#: Dictionary of command line arguments
self.opts=opts
self.simFlagsList=simFlagsList
self.TwoPhaseFlow=TwoPhaseFlow
self.timeValues={}
"OneLevelTransport"), level=4)
# mwf for interpolating subgrid error for gradients etc
if self.stabilization and self.stabilization.usesGradientStabilization:
self.timeIntegration = TimeIntegrationClass(
self, integrateInterpolationPoints=True)
else:
self.timeIntegration = TimeIntegrationClass(self)
if options is not None:
self.timeIntegration.setFromOptions(options)
log(memory("TimeIntegration", "OneLevelTransport"), level=4)
log("Calculating numerical quadrature formulas", 2)
self.calculateQuadrature()
self.setupFieldStrides()
comm = Comm.get()
self.comm = comm
if comm.size() > 1:
assert numericalFluxType is not None and numericalFluxType.useWeakDirichletConditions, "You must use a numerical flux to apply weak boundary conditions for parallel runs"
# STIFFNESS MATRIX #
self.interface_lumpedMassMatrix = numpy.zeros(self.u[0].dof.shape,'d')
self.stiffness_matrix_array = None
self.stiffness_matrix = None
log(memory("stride+offset", "OneLevelTransport"), level=4)
if numericalFluxType is not None:
if options is None or options.periodicDirichletConditions is None:
self.numericalFlux = numericalFluxType(
self,
dofBoundaryConditionsSetterDict,
advectiveFluxBoundaryConditionsSetterDict,
self.timeIntegration.beta_bdf[0],#mwf was self.timeIntegration.m_last[0],
self.q[('cfl',0)],
self.shockCapturing.numDiff[0],
self.shockCapturing.numDiff_last[0],
self.offset[0],self.stride[0],
self.mesh.nExteriorElementBoundaries_global,
self.mesh.exteriorElementBoundariesArray,
self.mesh.elementBoundaryElementsArray,
self.mesh.elementBoundaryLocalElementBoundariesArray,
self.mesh.elementBoundaryMaterialTypes,
self.coefficients.ebqe_v,
self.numericalFlux.isDOFBoundary[0],
self.numericalFlux.ebqe[('u',0)],
self.ebqe[('u',0)])
from proteus import Comm
comm = Comm.get()
filename = os.path.join(self.coefficients.opts.dataDir, "waterline." + str(comm.rank()) + "." + str(self.waterline_prints))
numpy.save(filename, self.waterline_data[0:self.waterline_npoints[0]])
self.waterline_prints += 1
def updateAfterMeshMotion(self):
"""
self.isosurfaces = isosurfaces
self.domain = domain
self.nodes = {}
self.elements = {}
self.normals = {}
self.normal_indices = {}
for f, values in self.isosurfaces:
for v in values:
assert v == 0.0, \
"only implemented for 0 isosurface in 3D for now"
self.activeTime = activeTime
self.sampleRate = sampleRate
self.format = format
self.comm = Comm.get()
self.writeBoundary = writeBoundary
logEvent("Building time integration object",2)
logEvent(memory("inflowBC, internalNodes,updateLocal2Global","OneLevelTransport"),level=4)
#mwf for interpolating subgrid error for gradients etc
if self.stabilization and self.stabilization.usesGradientStabilization:
self.timeIntegration = TimeIntegrationClass(self,integrateInterpolationPoints=True)
else:
self.timeIntegration = TimeIntegrationClass(self)
if options != None:
self.timeIntegration.setFromOptions(options)
logEvent(memory("TimeIntegration","OneLevelTransport"),level=4)
logEvent("Calculating numerical quadrature formulas",2)
self.calculateQuadrature()
self.setupFieldStrides()
comm = Comm.get()
self.comm=comm
if comm.size() > 1:
assert numericalFluxType != None and numericalFluxType.useWeakDirichletConditions,"You must use a numerical flux to apply weak boundary conditions for parallel runs"
logEvent(memory("stride+offset","OneLevelTransport"),level=4)
if numericalFluxType != None:
if options == None or options.periodicDirichletConditions == None:
self.numericalFlux = numericalFluxType(self,
dofBoundaryConditionsSetterDict,
advectiveFluxBoundaryConditionsSetterDict,
diffusiveFluxBoundaryConditionsSetterDictDict)
else:
self.numericalFlux = numericalFluxType(self,
dofBoundaryConditionsSetterDict,
advectiveFluxBoundaryConditionsSetterDict,
diffusiveFluxBoundaryConditionsSetterDictDict,
logEvent("Building time integration object", 2)
logEvent(memory("inflowBC, internalNodes,updateLocal2Global", "OneLevelTransport"), level=4)
# mwf for interpolating subgrid error for gradients etc
if self.stabilization and self.stabilization.usesGradientStabilization:
self.timeIntegration = TimeIntegrationClass(self, integrateInterpolationPoints=True)
else:
self.timeIntegration = TimeIntegrationClass(self)
if options is not None:
self.timeIntegration.setFromOptions(options)
logEvent(memory("TimeIntegration", "OneLevelTransport"), level=4)
logEvent("Calculating numerical quadrature formulas", 2)
self.calculateQuadrature()
self.setupFieldStrides()
comm = Comm.get()
self.comm = comm
if comm.size() > 1:
assert numericalFluxType is not None and numericalFluxType.useWeakDirichletConditions, "You must use a numerical flux to apply weak boundary conditions for parallel runs"
logEvent(memory("stride+offset", "OneLevelTransport"), level=4)
if numericalFluxType is not None:
if options is None or options.periodicDirichletConditions is None:
self.numericalFlux = numericalFluxType(self,
dofBoundaryConditionsSetterDict,
advectiveFluxBoundaryConditionsSetterDict,
diffusiveFluxBoundaryConditionsSetterDictDict)
else:
self.numericalFlux = numericalFluxType(self,
dofBoundaryConditionsSetterDict,
advectiveFluxBoundaryConditionsSetterDict,
diffusiveFluxBoundaryConditionsSetterDictDict,
self.timeIntegration.beta_bdf[0],
self.q[('cfl', 0)],
self.shockCapturing.numDiff[0],
self.shockCapturing.numDiff_last[0],
self.offset[0], self.stride[0],
self.mesh.nExteriorElementBoundaries_global,
self.mesh.exteriorElementBoundariesArray,
self.mesh.elementBoundaryElementsArray,
self.mesh.elementBoundaryLocalElementBoundariesArray,
self.mesh.elementBoundaryMaterialTypes,
self.coefficients.ebqe_v,
self.numericalFlux.isDOFBoundary[0],
self.numericalFlux.ebqe[('u', 0)],
self.ebqe[('u', 0)])
from proteus import Comm
comm = Comm.get()
filename = os.path.join(self.coefficients.opts.dataDir,
"waterline." + str(comm.rank()) + "." + str(self.waterline_prints))
numpy.save(
filename, self.waterline_data[
0:self.waterline_npoints[0]])
self.waterline_prints += 1
logEvent("Building time integration object", 2)
logEvent(memory("inflowBC, internalNodes,updateLocal2Global", "OneLevelTransport"), level=4)
# mwf for interpolating subgrid error for gradients etc
if self.stabilization and self.stabilization.usesGradientStabilization:
self.timeIntegration = TimeIntegrationClass(self, integrateInterpolationPoints=True)
else:
self.timeIntegration = TimeIntegrationClass(self)
if options is not None:
self.timeIntegration.setFromOptions(options)
logEvent(memory("TimeIntegration", "OneLevelTransport"), level=4)
logEvent("Calculating numerical quadrature formulas", 2)
self.calculateQuadrature()
self.setupFieldStrides()
comm = Comm.get()
self.comm = comm
if comm.size() > 1:
assert numericalFluxType is not None and numericalFluxType.useWeakDirichletConditions, "You must use a numerical flux to apply weak boundary conditions for parallel runs"
logEvent(memory("stride+offset", "OneLevelTransport"), level=4)
if numericalFluxType is not None:
if options is None or options.periodicDirichletConditions is None:
self.numericalFlux = numericalFluxType(self,
dofBoundaryConditionsSetterDict,
advectiveFluxBoundaryConditionsSetterDict,
diffusiveFluxBoundaryConditionsSetterDictDict)
else:
self.numericalFlux = numericalFluxType(self,
dofBoundaryConditionsSetterDict,
advectiveFluxBoundaryConditionsSetterDict,
diffusiveFluxBoundaryConditionsSetterDictDict,