Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# penalty term
# cek move to Numerical flux initialization
if 'penalty' in self.ebqe:
for ebNE in range(self.mesh.nExteriorElementBoundaries_global):
ebN = self.mesh.exteriorElementBoundariesArray[ebNE]
for k in range(
self.nElementBoundaryQuadraturePoints_elementBoundary):
self.ebqe['penalty'][ebNE, k] = old_div(self.numericalFlux.penalty_constant, \
self.mesh.elementBoundaryDiametersArray[ebN]**self.numericalFlux.penalty_power)
log(memory("numericalFlux", "OneLevelTransport"), level=4)
self.elementEffectiveDiametersArray = self.mesh.elementInnerDiametersArray
# use post processing tools to get conservative fluxes, None by default
from proteus import PostProcessingTools
self.velocityPostProcessor = PostProcessingTools.VelocityPostProcessingChooser(
self)
log(memory("velocity postprocessor", "OneLevelTransport"), level=4)
# helper for writing out data storage
from proteus import Archiver
self.elementQuadratureDictionaryWriter = Archiver.XdmfWriter()
self.elementBoundaryQuadratureDictionaryWriter = Archiver.XdmfWriter()
self.exteriorElementBoundaryQuadratureDictionaryWriter = Archiver.XdmfWriter()
self.globalResidualDummy = None
compKernelFlag = 0
if self.coefficients.useConstantH:
self.elementDiameter = self.mesh.elementDiametersArray.copy()
self.elementDiameter[:] = max(self.mesh.elementDiametersArray)
else:
self.elementDiameter = self.mesh.elementDiametersArray
self.mcorr3p = cMCorr3P.MCorr3P(
self.nSpace_global,
self.nQuadraturePoints_element,
self.u[0].femSpace.elementMaps.localFunctionSpace.dim,
self.timeIntegration = TimeIntegrationClass(self)
if options is not None:
self.timeIntegration.setFromOptions(options)
prof.logEvent(prof.memory("TimeIntegration", "OneLevelTransport"), level=4)
prof.logEvent("Calculating numerical quadrature formulas", 2)
self.calculateQuadrature()
self.setupFieldStrides()
comm = proteus.Comm.get()
self.comm = comm
if comm.size() > 1:
assert numericalFluxType is not None and numericalFluxType.useWeakDirichletConditions, "You must use a numerical flux to apply weak boundary conditions for parallel runs"
prof.logEvent(prof.memory("stride+offset", "OneLevelTransport"), level=4)
if numericalFluxType is not None:
if options is None or options.periodicDirichletConditions is None:
self.numericalFlux = numericalFluxType(self,
dofBoundaryConditionsSetterDict,
advectiveFluxBoundaryConditionsSetterDict,
diffusiveFluxBoundaryConditionsSetterDictDict)
else:
self.numericalFlux = numericalFluxType(self,
dofBoundaryConditionsSetterDict,
advectiveFluxBoundaryConditionsSetterDict,
diffusiveFluxBoundaryConditionsSetterDictDict,
options.periodicDirichletConditions)
else:
self.numericalFlux = None
# set penalty terms
# cek todo move into numerical flux initialization
else:
self.timeIntegration = TimeIntegrationClass(self)
if options is not None:
self.timeIntegration.setFromOptions(options)
logEvent(memory("TimeIntegration", "OneLevelTransport"), level=4)
logEvent("Calculating numerical quadrature formulas", 2)
self.calculateQuadrature()
self.setupFieldStrides()
comm = Comm.get()
self.comm = comm
if comm.size() > 1:
assert numericalFluxType is not None and numericalFluxType.useWeakDirichletConditions, "You must use a numerical flux to apply weak boundary conditions for parallel runs"
logEvent(memory("stride+offset", "OneLevelTransport"), level=4)
if numericalFluxType is not None:
if options is None or options.periodicDirichletConditions is None:
self.numericalFlux = numericalFluxType(self,
dofBoundaryConditionsSetterDict,
advectiveFluxBoundaryConditionsSetterDict,
diffusiveFluxBoundaryConditionsSetterDictDict)
else:
self.numericalFlux = numericalFluxType(self,
dofBoundaryConditionsSetterDict,
advectiveFluxBoundaryConditionsSetterDict,
diffusiveFluxBoundaryConditionsSetterDictDict,
options.periodicDirichletConditions)
else:
self.numericalFlux = None
# set penalty terms
# cek todo move into numerical flux initialization
self.timeIntegration = TimeIntegrationClass(self)
if options is not None:
self.timeIntegration.setFromOptions(options)
prof.logEvent(prof.memory("TimeIntegration", "OneLevelTransport"), level=4)
prof.logEvent("Calculating numerical quadrature formulas", 2)
self.calculateQuadrature()
self.setupFieldStrides()
comm = proteus.Comm.get()
self.comm = comm
if comm.size() > 1:
assert numericalFluxType is not None and numericalFluxType.useWeakDirichletConditions, "You must use a numerical flux to apply weak boundary conditions for parallel runs"
prof.logEvent(prof.memory("stride+offset", "OneLevelTransport"), level=4)
if numericalFluxType is not None:
if options is None or options.periodicDirichletConditions is None:
self.numericalFlux = numericalFluxType(self,
dofBoundaryConditionsSetterDict,
advectiveFluxBoundaryConditionsSetterDict,
diffusiveFluxBoundaryConditionsSetterDictDict)
else:
self.numericalFlux = numericalFluxType(self,
dofBoundaryConditionsSetterDict,
advectiveFluxBoundaryConditionsSetterDict,
diffusiveFluxBoundaryConditionsSetterDictDict,
options.periodicDirichletConditions)
else:
self.numericalFlux = None
# set penalty terms
# cek todo move into numerical flux initialization
self.internalNodes = None
log("Updating local to global mappings", 2)
self.updateLocal2Global()
log("Building time integration object", 2)
log(memory("inflowBC, internalNodes,updateLocal2Global",
"OneLevelTransport"), level=4)
# mwf for interpolating subgrid error for gradients etc
if self.stabilization and self.stabilization.usesGradientStabilization:
self.timeIntegration = TimeIntegrationClass(
self, integrateInterpolationPoints=True)
else:
self.timeIntegration = TimeIntegrationClass(self)
if options is not None:
self.timeIntegration.setFromOptions(options)
log(memory("TimeIntegration", "OneLevelTransport"), level=4)
log("Calculating numerical quadrature formulas", 2)
self.calculateQuadrature()
self.setupFieldStrides()
comm = Comm.get()
self.comm = comm
if comm.size() > 1:
assert numericalFluxType is not None and numericalFluxType.useWeakDirichletConditions, "You must use a numerical flux to apply weak boundary conditions for parallel runs"
log(memory("stride+offset", "OneLevelTransport"), level=4)
if numericalFluxType is not None:
if options is None or options.periodicDirichletConditions is None:
self.numericalFlux = numericalFluxType(
self,
dofBoundaryConditionsSetterDict,
advectiveFluxBoundaryConditionsSetterDict,
ebN, 0]
for i in range(self.mesh.nNodes_element):
if i != ebN_element:
I = self.mesh.elementNodesArray[eN_global, i]
self.internalNodes -= set([I])
self.nNodes_internal = len(self.internalNodes)
self.internalNodesArray = numpy.zeros((self.nNodes_internal,), 'i')
for nI, n in enumerate(self.internalNodes):
self.internalNodesArray[nI] = n
#
del self.internalNodes
self.internalNodes = None
log("Updating local to global mappings", 2)
self.updateLocal2Global()
log("Building time integration object", 2)
log(memory("inflowBC, internalNodes,updateLocal2Global",
"OneLevelTransport"), level=4)
# mwf for interpolating subgrid error for gradients etc
if self.stabilization and self.stabilization.usesGradientStabilization:
self.timeIntegration = TimeIntegrationClass(
self, integrateInterpolationPoints=True)
else:
self.timeIntegration = TimeIntegrationClass(self)
if options is not None:
self.timeIntegration.setFromOptions(options)
log(memory("TimeIntegration", "OneLevelTransport"), level=4)
log("Calculating numerical quadrature formulas", 2)
self.calculateQuadrature()
self.setupFieldStrides()
if 'penalty' in self.ebqe:
for ebNE in range(self.mesh.nExteriorElementBoundaries_global):
ebN = self.mesh.exteriorElementBoundariesArray[ebNE]
for k in range(
self.nElementBoundaryQuadraturePoints_elementBoundary):
self.ebqe['penalty'][ebNE, k] = old_div(self.numericalFlux.penalty_constant, \
self.mesh.elementBoundaryDiametersArray[ebN]**self.numericalFlux.penalty_power)
log(memory("numericalFlux", "OneLevelTransport"), level=4)
self.elementEffectiveDiametersArray = self.mesh.elementInnerDiametersArray
# strong Dirichlet
self.dirichletConditionsForceDOF = {0: DOFBoundaryConditions(self.u[cj].femSpace, dofBoundaryConditionsSetterDict[cj], weakDirichletConditions=False)}
# use post processing tools to get conservative fluxes, None by default
from proteus import PostProcessingTools
self.velocityPostProcessor = PostProcessingTools.VelocityPostProcessingChooser(
self)
log(memory("velocity postprocessor", "OneLevelTransport"), level=4)
# helper for writing out data storage
from proteus import Archiver
self.elementQuadratureDictionaryWriter = Archiver.XdmfWriter()
self.elementBoundaryQuadratureDictionaryWriter = Archiver.XdmfWriter()
self.exteriorElementBoundaryQuadratureDictionaryWriter = Archiver.XdmfWriter()
self.globalResidualDummy = None
compKernelFlag = 0
self.pres = cPres.Pres(
self.nSpace_global,
self.nQuadraturePoints_element,
self.u[0].femSpace.elementMaps.localFunctionSpace.dim,
self .u[0].femSpace.referenceFiniteElement.localFunctionSpace.dim,
self.testSpace[0].referenceFiniteElement.localFunctionSpace.dim,
self.nElementBoundaryQuadraturePoints_elementBoundary,
compKernelFlag)
self.internalNodes = None
log("Updating local to global mappings", 2)
self.updateLocal2Global()
log("Building time integration object", 2)
log(memory("inflowBC, internalNodes,updateLocal2Global",
"OneLevelTransport"), level=4)
# mwf for interpolating subgrid error for gradients etc
if self.stabilization and self.stabilization.usesGradientStabilization:
self.timeIntegration = TimeIntegrationClass(
self, integrateInterpolationPoints=True)
else:
self.timeIntegration = TimeIntegrationClass(self)
if options is not None:
self.timeIntegration.setFromOptions(options)
log(memory("TimeIntegration", "OneLevelTransport"), level=4)
log("Calculating numerical quadrature formulas", 2)
self.calculateQuadrature()
self.setupFieldStrides()
comm = Comm.get()
self.comm = comm
if comm.size() > 1:
assert numericalFluxType is not None and numericalFluxType.useWeakDirichletConditions, "You must use a numerical flux to apply weak boundary conditions for parallel runs"
# add some structures for elliptic re-distancing
self.interface_locator = None
self.cell_interface_locator = None
self.interface_lumpedMassMatrix = numpy.zeros(self.u[0].dof.shape,'d')
self.abs_grad_u = numpy.zeros(self.u[0].dof.shape,'d')
self.lumped_qx = numpy.zeros(self.u[0].dof.shape,'d')
self.lumped_qy = numpy.zeros(self.u[0].dof.shape,'d')
if 'penalty' in self.ebq_global:
for ebN in range(self.mesh.nElementBoundaries_global):
for k in range(
self.nElementBoundaryQuadraturePoints_elementBoundary):
self.ebq_global['penalty'][ebN, k] = old_div(self.numericalFlux.penalty_constant, (
self.mesh.elementBoundaryDiametersArray[ebN]**self.numericalFlux.penalty_power))
# penalty term
# cek move to Numerical flux initialization
if 'penalty' in self.ebqe:
for ebNE in range(self.mesh.nExteriorElementBoundaries_global):
ebN = self.mesh.exteriorElementBoundariesArray[ebNE]
for k in range(
self.nElementBoundaryQuadraturePoints_elementBoundary):
self.ebqe['penalty'][ebNE, k] = old_div(self.numericalFlux.penalty_constant, \
self.mesh.elementBoundaryDiametersArray[ebN]**self.numericalFlux.penalty_power)
log(memory("numericalFlux", "OneLevelTransport"), level=4)
self.elementEffectiveDiametersArray = self.mesh.elementInnerDiametersArray
# use post processing tools to get conservative fluxes, None by default
from proteus import PostProcessingTools
self.velocityPostProcessor = PostProcessingTools.VelocityPostProcessingChooser(
self)
log(memory("velocity postprocessor", "OneLevelTransport"), level=4)
# helper for writing out data storage
from proteus import Archiver
self.elementQuadratureDictionaryWriter = Archiver.XdmfWriter()
self.elementBoundaryQuadratureDictionaryWriter = Archiver.XdmfWriter()
self.exteriorElementBoundaryQuadratureDictionaryWriter = Archiver.XdmfWriter()
self.globalResidualDummy = None
log("flux bc objects")
for ci, fbcObject in list(self.fluxBoundaryConditionsObjectsDict.items()):
self.ebqe[('advectiveFlux_bc_flag', ci)] = numpy.zeros(self.ebqe[('advectiveFlux_bc', ci)].shape, 'i')
for t, g in list(fbcObject.advectiveFluxBoundaryConditionsDict.items()):
else:
self.timeIntegration = TimeIntegrationClass(self)
if options is not None:
self.timeIntegration.setFromOptions(options)
log(memory("TimeIntegration", "OneLevelTransport"), level=4)
log("Calculating numerical quadrature formulas", 2)
self.calculateQuadrature()
self.setupFieldStrides()
comm = Comm.get()
self.comm = comm
if comm.size() > 1:
assert numericalFluxType is not None and numericalFluxType.useWeakDirichletConditions, "You must use a numerical flux to apply weak boundary conditions for parallel runs"
log(memory("stride+offset", "OneLevelTransport"), level=4)
if numericalFluxType is not None:
if options is None or options.periodicDirichletConditions is None:
self.numericalFlux = numericalFluxType(
self,
dofBoundaryConditionsSetterDict,
advectiveFluxBoundaryConditionsSetterDict,
diffusiveFluxBoundaryConditionsSetterDictDict)
else:
self.numericalFlux = numericalFluxType(
self,
dofBoundaryConditionsSetterDict,
advectiveFluxBoundaryConditionsSetterDict,
diffusiveFluxBoundaryConditionsSetterDictDict,
options.periodicDirichletConditions)
else:
self.numericalFlux = None