Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from proteus.iproteus import *
from proteus import Comm
from proteus.Domain import InterpolatedBathymetryDomain
from proteus.MeshTools import InterpolatedBathymetryMesh
from proteus.Archiver import XdmfArchive
import xml.etree.ElementTree as ElementTree
comm = Comm.init()
Profiling.logLevel=7
Profiling.verbose=True
def setupStepGauss():
import numpy as np
from math import sin,cos,pi,sqrt,exp
#set up a fake LiDAR point set
nPoints_x = nPoints_y = 51
delta_x = 2.0/float(nPoints_x-1)
delta_y = 2.0/float(nPoints_y-1)
bathy = np.zeros((nPoints_x*nPoints_y,3),'d')
for i in range(nPoints_y):
for j in range(nPoints_x):
x = -0.5+j*delta_x
y = -0.5+i*delta_y
#z = 1.0
np.savetxt("deim_approx_errors_mass_test_T={0}_nDT={1}_m={2}.dat".format(T,nDTout,m_mass),errors_mass)
return errors,errors_mass,F_deim,Fm_deim
def test_deim_approx_full(tol=1.0e-12):
"""
check that get very small error if use full basis
"""
T = 0.1; nDTout=10; m=nDTout+1
errors,errors_mass,F_deim,Fm_deim = deim_approx(T=T,nDTout=nDTout,m=m,m_mass=m)
assert errors.min() < tol
assert errors_mass.min() < tol
if __name__ == "__main__":
from proteus import Comm
comm = Comm.init()
import nose
nose.main(defaultTest='test_deim:test_deim_approx_full')
if not so.useOneMesh:
so.useOneArchive=False
logEvent("Setting Archiver(s)")
if so.useOneArchive:
self.femSpaceWritten={}
tmp = Archiver.XdmfArchive(opts.dataDir,so.name,useTextArchive=opts.useTextArchive,
gatherAtClose=opts.gatherArchive,hotStart=opts.hotStart,
useGlobalXMF=(not opts.subdomainArchives),
global_sync=opts.global_sync)
self.ar = dict([(i,tmp) for i in range(len(self.pList))])
elif len(self.pList) == 1:
self.ar = {0:Archiver.XdmfArchive(opts.dataDir,so.name,useTextArchive=opts.useTextArchive,
gatherAtClose=opts.gatherArchive,hotStart=opts.hotStart)} #reuse so.name if possible
else:
self.ar = dict([(i,Archiver.XdmfArchive(opts.dataDir,p.name,useTextArchive=opts.useTextArchive,
gatherAtClose=opts.gatherArchive,hotStart=opts.hotStart)) for i,p in enumerate(self.pList)])
#by default do not save quadrature point info
self.archive_q = dict([(i,False) for i in range(len(self.pList))]);
self.archive_ebq_global = dict([(i,False) for i in range(len(self.pList))]);
self.archive_ebqe = dict([(i,False) for i in range(len(self.pList))]);
self.archive_pod_residuals = dict([(i,False) for i in range(len(self.pList))]);
if simFlagsList is not None:
assert len(simFlagsList) == len(self.pList), "len(simFlagsList) = %s should be %s " % (len(simFlagsList),len(self.pList))
for index in range(len(self.pList)):
if 'storeQuantities' in simFlagsList[index]:
for quant in [a for a in simFlagsList[index]['storeQuantities'] if a is not None]:
recType = quant.split(':')
if len(recType) > 1 and recType[0] == 'q':
self.archive_q[index] = True
elif len(recType) > 1 and recType[0] == 'ebq_global':
self.archive_ebq_global[index] = True
self.ebq_global['penalty'][ebN, k] = old_div(self.numericalFlux.penalty_constant, (
self.mesh.elementBoundaryDiametersArray[ebN]**self.numericalFlux.penalty_power))
# penalty term
# cek move to Numerical flux initialization
if 'penalty' in self.ebqe:
for ebNE in range(self.mesh.nExteriorElementBoundaries_global):
ebN = self.mesh.exteriorElementBoundariesArray[ebNE]
for k in range(
self.nElementBoundaryQuadraturePoints_elementBoundary):
self.ebqe['penalty'][ebNE, k] = old_div(self.numericalFlux.penalty_constant, \
self.mesh.elementBoundaryDiametersArray[ebN]**self.numericalFlux.penalty_power)
log(memory("numericalFlux", "OneLevelTransport"), level=4)
self.elementEffectiveDiametersArray = self.mesh.elementInnerDiametersArray
# use post processing tools to get conservative fluxes, None by default
from proteus import PostProcessingTools
self.velocityPostProcessor = PostProcessingTools.VelocityPostProcessingChooser(
self)
log(memory("velocity postprocessor", "OneLevelTransport"), level=4)
# helper for writing out data storage
from proteus import Archiver
self.elementQuadratureDictionaryWriter = Archiver.XdmfWriter()
self.elementBoundaryQuadratureDictionaryWriter = Archiver.XdmfWriter()
self.exteriorElementBoundaryQuadratureDictionaryWriter = Archiver.XdmfWriter()
self.globalResidualDummy = None
compKernelFlag = 0
# if self.coefficients.useConstantH:
# self.elementDiameter = self.mesh.elementDiametersArray.copy()
# self.elementDiameter[:] = max(self.mesh.elementDiametersArray)
# else:
# self.elementDiameter = self.mesh.elementDiametersArray
self.elementDiameter = self.mesh.elementDiametersArray
self.presinit = cPresInit.PresInit(
for ebNE in range(self.mesh.nExteriorElementBoundaries_global):
ebN = self.mesh.exteriorElementBoundariesArray[ebNE]
eN_global = self.mesh.elementBoundaryElementsArray[ebN, 0]
ebN_element = self.mesh.elementBoundaryLocalElementBoundariesArray[ebN, 0]
for i in range(self.mesh.nNodes_element):
if i != ebN_element:
I = self.mesh.elementNodesArray[eN_global, i]
self.internalNodes -= set([I])
self.nNodes_internal = len(self.internalNodes)
self.internalNodesArray = np.zeros((self.nNodes_internal,), 'i')
for nI, n in enumerate(self.internalNodes):
self.internalNodesArray[nI] = n
#
del self.internalNodes
self.internalNodes = None
logEvent("Updating local to global mappings", 2)
self.updateLocal2Global()
logEvent("Building time integration object", 2)
logEvent(memory("inflowBC, internalNodes,updateLocal2Global", "OneLevelTransport"), level=4)
# mwf for interpolating subgrid error for gradients etc
if self.stabilization and self.stabilization.usesGradientStabilization:
self.timeIntegration = TimeIntegrationClass(self, integrateInterpolationPoints=True)
else:
self.timeIntegration = TimeIntegrationClass(self)
if options is not None:
self.timeIntegration.setFromOptions(options)
logEvent(memory("TimeIntegration", "OneLevelTransport"), level=4)
logEvent("Calculating numerical quadrature formulas", 2)
self.calculateQuadrature()
self.setupFieldStrides()
argsDict["isDiffusiveFluxBoundary_u"] = self.ebqe[('diffusiveFlux_bc_flag', 0, 0)]
argsDict["ebqe_bc_diffusiveFlux_u_ext"] = self.ebqe[('diffusiveFlux_bc', 0, 0)]
argsDict["ebqe_phi"] = self.coefficients.ebqe_phi
argsDict["epsFact"] = self.coefficients.epsFact
argsDict["ebqe_dissipation"] = self.coefficients.ebqe_dissipation
argsDict["ebqe_porosity"] = self.coefficients.ebqe_porosity
argsDict["ebqe_u"] = self.ebqe[('u', 0)]
argsDict["ebqe_flux"] = self.ebqe[('advectiveFlux', 0)]
self.kappa.calculateResidual(argsDict)
if self.forceStrongConditions:
for dofN, g in list(self.dirichletConditionsForceDOF.DOFBoundaryConditionsDict.items()):
r[dofN] = 0
if self.stabilization:
self.stabilization.accumulateSubgridMassHistory(self.q)
prof.logEvent("Global residual", level=9, data=r)
# mwf decide if this is reasonable for keeping solver statistics
self.nonlinear_function_evaluations += 1
if self.globalResidualDummy is None:
self.globalResidualDummy = np.zeros(r.shape, 'd')
def outputRow(self, time):
""" Outputs a single row of currently calculated gauge data to self.file"""
assert self.isGaugeOwner
if self.isPointGauge or self.isLineGauge:
self.localQuantitiesBuf = np.concatenate([gaugesVec.getArray() for gaugesVec in
self.pointGaugeVecs]).astype(np.double)
logEvent("Sending local array of type %s and shape %s to root on comm %s" % (
str(self.localQuantitiesBuf.dtype), str(self.localQuantitiesBuf.shape), str(self.gaugeComm)), 9)
if self.gaugeComm.rank == 0:
logEvent("Receiving global array of type %s and shape %s on comm %s" % (
str(self.localQuantitiesBuf.dtype), str(self.globalQuantitiesBuf.shape), str(self.gaugeComm)), 9)
self.gaugeComm.Gatherv(sendbuf=[self.localQuantitiesBuf, MPI.DOUBLE],
recvbuf=[self.globalQuantitiesBuf, (self.globalQuantitiesCounts, None),
MPI.DOUBLE], root=0)
self.gaugeComm.Barrier()
if self.isLineIntegralGauge:
lineIntegralGaugeBuf = self.lineIntegralGaugesVec.getArray()
globalLineIntegralGaugeBuf = lineIntegralGaugeBuf.copy()
self.gaugeComm.Reduce(lineIntegralGaugeBuf, globalLineIntegralGaugeBuf, op=MPI.SUM)
else:
globalLineIntegralGaugeBuf = []
if self.gaugeComm.rank == 0:
self.packFraction=closure.packFraction
self.packMargin=closure.packMargin
self.maxFraction=closure.maxFraction
self.frFraction=closure.frFraction
self.sigmaC=closure.sigmaC
self.C3e=closure.C3e
self.C4e=closure.C4e
self.eR=closure.eR
self.fContact=closure.fContact
self.mContact=closure.mContact
self.nContact=closure.nContact
self.angFriction=closure.angFriction
self.vos_limiter = closure.vos_limiter
self.mu_fr_limiter = closure.mu_fr_limiter
self.sedFlag = 1
prof.logEvent("INFO: Loading parameters for sediment closure",2)
except:
self.aDarcy=-1.
self.betaForch=-1.
self.grain=-1.
self.packFraction=-1.
self.packMargin=-1.
self.maxFraction=-1.
self.frFraction=-1.
self.sigmaC=-1.
self.C3e=-1.
self.C4e=-1.
self.eR=-1.
self.fContact=-1.
self.mContact=-1.
self.nContact=-1.
self.angFriction=-1.
def globalConstantSolve(self, u, r):
U = 0.0
R = 0.0
J = 0.0
(R, J) = self.globalConstantRJ(u, r, U)
its = 0
log(" Mass Conservation Residual 0 ", level=3, data=R)
RNORM_OLD = fabs(R)
while ((fabs(R) > self.atol and its < self.maxIts) or its < 1):
U -= old_div(R, (J + 1.0e-8))
(R, J) = self.globalConstantRJ(u, r, U)
lsits = 0
while(fabs(R) > 0.99 * RNORM_OLD and lsits < self.maxLSits):
lsits += 1
U += (0.5)**lsits * (old_div(R, (J + 1.0e-8)))
(R, J) = self.globalConstantRJ(u, r, U)
its += 1
log(" Mass Conservation Residual " + repr(its)+" ", level=3, data=R)
self.u[0].dof.flat[:] = U
sfConfig = p0.domain.PUMIMesh.size_field_config()
logEvent("h-adapt mesh by calling AdaptPUMIMesh")
if(sfConfig=="pseudo"):
logEvent("Testing solution transfer and restart feature of adaptation. No actual mesh adaptation!")
else:
p0.domain.PUMIMesh.adaptPUMIMesh()
#code to suggest adapting until error is reduced;
#not fully baked and can lead to infinite loops of adaptation
#if(sfConfig=="ERM"):
# p0.domain.PUMIMesh.get_local_error()
# while(p0.domain.PUMIMesh.willAdapt()):
# p0.domain.PUMIMesh.adaptPUMIMesh()
# p0.domain.PUMIMesh.get_local_error()
logEvent("Converting PUMI mesh to Proteus")
#ibaned: PUMI conversion #2
#TODO: this code is nearly identical to
#PUMI conversion #1, they should be merged
#into a function
if p0.domain.nd == 3:
mesh = MeshTools.TetrahedralMesh()
else:
mesh = MeshTools.TriangularMesh()
mesh.convertFromPUMI(p0.domain.PUMIMesh,
p0.domain.faceList,
p0.domain.regList,
parallel = self.comm.size() > 1,
dim = p0.domain.nd)
self.PUMI2Proteus(mesh)