Source code for PyNucleus_multilevelSolver.levels

###################################################################################
# Copyright 2021 National Technology & Engineering Solutions of Sandia,           #
# LLC (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the           #
# U.S. Government retains certain rights in this software.                        #
# If you want to use this code, please refer to the README.rst and LICENSE files. #
###################################################################################

import logging
import numpy as np
import mpi4py.rc
mpi4py.rc.initialize = False
from mpi4py import MPI
from PyNucleus_fem.DoFMaps import P1_DoFMap
from PyNucleus_fem import str2DoFMap
from PyNucleus_base.myTypes import REAL
from PyNucleus_base.linear_operators import CSR_LinearOperator
from PyNucleus_base.linear_operators import SSS_LinearOperator
from . restrictionProlongation import buildRestrictionProlongation
from PyNucleus_base.utilsFem import TimerManager
from PyNucleus_base.ip_norm import (ip_serial, norm_serial,
                                    ip_distributed, norm_distributed,
                                    wrapRealInnerToComplex, wrapRealNormToComplex)
from PyNucleus_fem.femCy import assembleMatrix
from PyNucleus_fem.functions import function
from PyNucleus_fem.distributed_operators import (DistributedLinearOperator,
                                                 CSR_DistributedLinearOperator)
from PyNucleus_fem import (DIRICHLET, NEUMANN,
                           HOMOGENEOUS_DIRICHLET, HOMOGENEOUS_NEUMANN,
                           boundaryConditions)
from PyNucleus_fem.femCy import stiffness_1d_in_2d_sym_P1
from PyNucleus_fem.mesh import (PHYSICAL, NO_BOUNDARY, INTERIOR_NONOVERLAPPING, INTERIOR)
LOGGER = logging.getLogger(__name__)

# what should be built
DOFMAPS = 1
RESTRICTION_PROLONGATION = 2
SPARSITY_PATTERN = 4
OVERLAPS = 8
ASSEMBLY = 16

NO_BUILD = 0
DOFMAPS_ONLY = DOFMAPS
RESTRICTION_PROLONGATION_ONLY = DOFMAPS + RESTRICTION_PROLONGATION
SPARSITY_ONLY = DOFMAPS + OVERLAPS + RESTRICTION_PROLONGATION + SPARSITY_PATTERN
SINGLE_LEVEL = DOFMAPS + OVERLAPS + ASSEMBLY
FULL_BUILD = DOFMAPS + OVERLAPS + RESTRICTION_PROLONGATION + ASSEMBLY

# What information is retained in meshLevels
DELETE_MESH = 0
KEEP_MESH = 1


[docs] class level:
[docs] def __init__(self, params, previousLevel=None, comm=None, label='', startLevelNo=0, isLastLevel=False): self.params = params self.previousLevel = previousLevel if previousLevel is not None: assert not previousLevel.isLastLevel self.startLevelNo = startLevelNo self.nextLevel = None self.comm = comm self.label = label self.isLastLevel = isLastLevel label = '{}: '.format(self.levelID) self.Timer = TimerManager(LOGGER, comm=self.comm, prefix=label)
[docs] def getLevelNo(self): if self.previousLevel is None: return self.startLevelNo else: return self.previousLevel.getLevelNo()+1
levelNo = property(fget=getLevelNo)
[docs] def getLevelID(self): if len(self.label) > 0: label = '{} {}'.format(self.label, self.levelNo) else: label = 'Level {}'.format(self.levelNo) return label
levelID = property(fget=getLevelID) def __repr__(self): if len(self.label) > 0: label = '{} {}'.format(self.label, self.levelNo) else: label = '{}'.format(self.levelNo) s = '{} {}\n'.format(self.__class__.__name__, label) return s
######################################################################
[docs] class meshLevel(level):
[docs] def __init__(self, mesh, params, previousLevel=None, interfaces=None, meshOverlaps=None, interiorBL=None, comm=None, label='', meshInformationPolicy=KEEP_MESH, startLevelNo=0, isLastLevel=False): super(meshLevel, self).__init__(params, previousLevel, comm, label, startLevelNo, isLastLevel) self.mesh = mesh self.global_mesh = None self.interfaces = interfaces if self.interfaces is not None and self.params['debugOverlaps']: self.interfaces.validate(self.mesh, self.comm, label='Mesh interface \'{} {}\''.format(self.label, self.levelNo)) self.meshOverlaps = meshOverlaps if self.meshOverlaps is not None and self.params['debugOverlaps']: self.meshOverlaps.check(self.mesh, self.comm, label='Mesh overlap \'{} {}\''.format(self.label, self.levelNo)) self.interiorBL = interiorBL self.algebraicLevel = None self.meshInformationPolicy = meshInformationPolicy self._h = None self.algebraicLevelType = algebraicLevel
[docs] def setAlgebraicLevelType(self, algLevelType): self.algebraicLevelType = algLevelType
[docs] def refine(self, meshInformationPolicy): with self.Timer('Refined mesh'): newMesh, self.lookup = self.mesh.refine(returnLookup=True) if self.params['meshTransformation'] is not None: self.params['meshTransformation'](newMesh, self.lookup) if self.interfaces is not None: with self.Timer('Refined interfaces'): self.interfaces.refine(newMesh) if self.meshOverlaps is not None: with self.Timer('Refined mesh overlaps'): meshOverlaps = self.meshOverlaps.copy() meshOverlaps.refine(newMesh) if self.meshOverlaps is None: meshOverlaps = None if self.interiorBL is not None: with self.Timer('Refined boundary layers'): self.interiorBL.refine(newMesh) newMeshLevel = meshLevel(newMesh, self.params, self, self.interfaces, meshOverlaps, self.interiorBL, self.comm, self.label, meshInformationPolicy) if hasattr(self, 'numberCellsBeforeExtension'): newMeshLevel.numberCellsBeforeExtension = 2**self.mesh.dim * self.numberCellsBeforeExtension if hasattr(self, 'numberCellsLastLayer'): newMeshLevel.numberCellsLastLayer = 2**self.mesh.dim * self.numberCellsLastLayer newMeshLevel.setAlgebraicLevelType(self.algebraicLevelType) self.nextLevel = newMeshLevel return newMeshLevel
[docs] def copy(self): newMeshLevel = meshLevel(self.mesh, self.params, self, self.interfaces, self.meshOverlaps, self.interiorBL, self.comm, self.label, self.meshInformationPolicy) return newMeshLevel
[docs] def getIsDistributed(self): return self.interfaces is not None
isDistributed = property(fget=getIsDistributed)
[docs] def getAlgebraicLevel(self, buildType): self.algebraicLevel = self.algebraicLevelType(self, buildType) return self.algebraicLevel
[docs] def clean(self): if self.meshInformationPolicy == DELETE_MESH: self.mesh = None self.meshOverlaps = None self.interfaces = None
[docs] def getLevelDict(self): lvl = {} if self.mesh is not None: lvl['mesh'] = self.mesh if self.interfaces is not None: lvl['interfaces'] = self.interfaces if self.meshOverlaps is not None: lvl['meshOverlaps'] = self.meshOverlaps return lvl
[docs] @staticmethod def fromLevelDict(lvl, params={}, previousLevel=None, comm=None, startLevelNo=0, label=''): alvl = meshLevel(None, params, previousLevel, comm=comm, startLevelNo=startLevelNo, label=label) if 'mesh' in lvl: alvl.mesh = lvl['mesh'] if 'interfaces' in lvl: alvl.interfaces = lvl['interfaces'] if 'meshOverlaps' in lvl: alvl.meshOverlaps = lvl['meshOverlaps'] return alvl
def __repr__(self): s = super(meshLevel, self).__repr__() if self.mesh is not None: s += ' mesh: '+self.mesh.__repr__() if self.interfaces is not None: s += self.interfaces.__repr__() return s
[docs] def getH(self): if self._h is None: h = self.mesh.h if self.comm is not None: self._h = self.comm.allreduce(h, op=MPI.MAX) return self._h
h = property(fget=getH)
######################################################################
[docs] class algebraicLevelBase(level):
[docs] def __init__(self, meshLevel, buildType): if meshLevel.previousLevel is not None: previousLevel = meshLevel.previousLevel.algebraicLevel else: previousLevel = None super(algebraicLevelBase, self).__init__(meshLevel.params, previousLevel, meshLevel.comm, meshLevel.label, meshLevel.levelNo, meshLevel.isLastLevel) self.meshLevel = meshLevel self.P = None self.R = None self.DoFMap = None self.algebraicOverlaps = None self.build(buildType)
[docs] def build(self, buildType): buildNeumann = self.params.get('buildNeumann', False) element = self.params['element'] reorder = self.params['reorder'] commType = self.params['commType'] DoFMap_type = str2DoFMap(element) # Set DoFMap if buildType & DOFMAPS: if 'tag' in self.params: self.DoFMap = DoFMap_type(self.meshLevel.mesh, self.params['tag']) elif 'boundaryCondition' in self.params: if self.params['boundaryCondition'] in (HOMOGENEOUS_NEUMANN, DIRICHLET, NEUMANN): self.DoFMap = DoFMap_type(self.meshLevel.mesh, NO_BOUNDARY) elif self.params['boundaryCondition'] == HOMOGENEOUS_DIRICHLET: self.DoFMap = DoFMap_type(self.meshLevel.mesh, PHYSICAL) else: raise NotImplementedError(boundaryConditions[self.params['boundaryCondition']]) else: if self.isLastLevel and self.params['interiorBC'] == 'homogeneousDirichlet' and hasattr(self.meshLevel, 'numberCellsLastLayer'): self.DoFMap = DoFMap_type(self.meshLevel.mesh, [PHYSICAL, INTERIOR], skipCellsAfter=self.meshLevel.mesh.num_cells-self.meshLevel.numberCellsLastLayer) elif not hasattr(self.meshLevel, 'numberCellsLastLayer') or not self.isLastLevel or self.params['interiorBC'] == 'homogeneousNeumann': self.DoFMap = DoFMap_type(self.meshLevel.mesh, [PHYSICAL]) else: raise NotImplementedError() if buildNeumann: self.DoFMapNeumann = DoFMap_type(self.meshLevel.mesh, [PHYSICAL]) if not reorder: if buildType & OVERLAPS: # build algebraic overlaps if self.meshLevel.meshOverlaps is not None: with self.Timer('Build algebraic overlaps of type \'{}\''.format(commType)): self.algebraicOverlaps = self.meshLevel.meshOverlaps.getDoFs(self.meshLevel.mesh, self.DoFMap, commType, allowInteriorBoundary=((self.params['interiorBC'] == 'homogeneousNeumann') or not self.isLastLevel)) if self.params['debugOverlaps']: self.algebraicOverlaps.check(mesh=self.meshLevel.mesh, dm=self.DoFMap, label='algebraicOverlaps in \'{} {}\''.format(self.label, self.levelNo), interfaces=self.meshLevel.meshOverlaps) elif self.meshLevel.interfaces is not None: with self.Timer('Build algebraic overlaps of type \'{}\''.format(commType)): self.algebraicOverlaps = self.meshLevel.interfaces.getDoFs(self.meshLevel.mesh, self.DoFMap, commType) if self.params['debugOverlaps']: self.algebraicOverlaps.check(mesh=self.meshLevel.mesh, dm=self.DoFMap, label='algebraicOverlaps in \'{} {}\''.format(self.label, self.levelNo), interfaces=self.meshLevel.interfaces) if self.algebraicOverlaps is not None: self.inner = ip_distributed(self.algebraicOverlaps, 0) self.norm = norm_distributed(self.algebraicOverlaps, 0) else: self.inner = ip_serial() self.norm = norm_serial() if self.DoFMap is not None: self.DoFMap.set_ip_norm(self.inner, self.norm) self.DoFMap.set_complex_ip_norm(wrapRealInnerToComplex(self.inner), wrapRealNormToComplex(self.norm)) if (buildType & RESTRICTION_PROLONGATION) and (self.previousLevel is not None): assert (self.previousLevel.DoFMap is not None) and (self.DoFMap is not None) # use reorder here, since reorder=False bugs out (self.R, self.P) = buildRestrictionProlongation(self.previousLevel.DoFMap, self.DoFMap)
[docs] def buildCoarserMatrices(self): """ Recursively build matrices on coarser levels """ if self.previousLevel is not None: self.previousLevel.buildCoarserMatrices()
[docs] def clean(self): if not self.params['keepAllDoFMaps'] and self.previousLevel is not None: self.DoFMap = None
[docs] @classmethod def getKeys(cls): return ['P', 'R', 'DoFMap', 'algebraicOverlaps', 'Timer']
[docs] def getLevelDict(self): lvl = {} for key in self.getKeys(): if getattr(self, key) is not None: lvl[key] = getattr(self, key) return lvl
[docs] @classmethod def fromLevelDict(cls, meshLevel, lvl): alvl = algebraicLevel(meshLevel, NO_BUILD) for key in cls.getKeys(): if key in lvl: setattr(alvl, key, lvl[key]) return alvl
@property def accumulateOperator(self): if self.algebraicOverlaps is not None: return self.algebraicOverlaps.getAccumulateOperator() else: return None
[docs] class algebraicLevel(algebraicLevelBase):
[docs] def __init__(self, meshLevel, buildType): self.A = None self.S = None self.D = None self.M = None self.surface_mass = None self.surface_stiffness = None super(algebraicLevel, self).__init__(meshLevel, buildType)
[docs] def build(self, buildType): super(algebraicLevel, self).build(buildType) diffusivity = self.params['diffusivity'] reaction = self.params['reaction'] symmetric = self.params['symmetric'] element = self.params['element'] reorder = self.params['reorder'] commType = self.params['commType'] buildMass = self.params['buildMass'] or reaction is not None driftCoeff = self.params.get('driftCoeff', None) buildNeumann = self.params.get('buildNeumann', False) if buildType & SPARSITY_PATTERN: # set up sparsity patterns only DoFMap = self.DoFMap mesh = self.meshLevel.mesh self.fullyAssembled = False with self.Timer('Prepared sparsity patterns'): self.S = DoFMap.buildSparsityPattern(mesh.cells, symmetric=symmetric, reorder=reorder) if driftCoeff is not None: self.D = self.S.copy() if buildMass: self.M = self.S.copy() if buildType & ASSEMBLY: # fully build matrices DoFMap = self.DoFMap mesh = self.meshLevel.mesh self.fullyAssembled = True with self.Timer('Assembled matrices'): self.S = DoFMap.assembleStiffness(sss_format=symmetric, reorder=reorder, diffusivity=diffusivity) if buildMass: self.M = DoFMap.assembleMass(sss_format=symmetric, reorder=reorder) if driftCoeff is not None: self.D = DoFMap.assembleDrift(driftCoeff) if buildNeumann: self.neumannA = self.DoFMapNeumann.assembleStiffness(sss_format=symmetric, reorder=reorder, diffusivity=diffusivity) if isinstance(reaction, (float, REAL)): self.A = self.S.copy() for j in range(self.A.data.shape[0]): self.A.data[j] += reaction*self.M.data[j] if isinstance(self.A, SSS_LinearOperator): for j in range(self.A.num_rows): self.A.diagonal[j] += reaction*self.M.diagonal[j] elif isinstance(reaction, function): self.A = self.S.copy() dm = self.DoFMap c = dm.interpolate(reaction) for k in range(dm.num_dofs): for j in range(self.A.indptr[k], self.A.indptr[k+1]): self.A.data[j] += c[k]*self.M.data[j] if isinstance(self.A, SSS_LinearOperator): for k in range(self.A.num_rows): self.A.diagonal[k] += c[k]*self.M.diagonal[k] elif reaction is None: self.A = self.S else: raise NotImplementedError() # surface mass matrix if self.isLastLevel and self.params['buildSurfaceMass']: with self.Timer('Build surface mass matrix'): if self.params['depth'] > 0: surface = mesh.get_surface_mesh(INTERIOR) else: surface = mesh.get_surface_mesh(INTERIOR_NONOVERLAPPING) from PyNucleus_fem.femCy import assembleSurfaceMass self.surface_mass = assembleSurfaceMass(mesh, surface, self.DoFMap, sss_format=symmetric, reorder=reorder) # ToDo: Don't just copy the sparsity pattern, this is a big waste of memory # data = np.zeros((self.A.nnz), dtype=REAL) # if symmetric: # diagonal = np.zeros(self.A.shape[0], dtype=REAL) # M = SSS_LinearOperator(self.A.indices, self.A.indptr, data, diagonal) # else: # M = CSR_LinearOperator(self.A.indices, self.A.indptr, data) # if element == 'P1': # dmS = P1_DoFMap(mesh, [PHYSICAL]) # dmS.cells = surface.cells # elif element == 'P2': # assert False, "Surface mass matrix not implemented for P2." # dmS = P2_DoFMap(mesh, [PHYSICAL]) # cellOrig = dmS.mesh.cells # dmS.mesh.cells = surface.cells # if mesh.dim == 1 and element == 'P1': # dmS.dofs_per_element = 1 # self.surface_mass = assembleMatrix(surface, dmS, mass_0d_in_1d_sym_P1(), A=M, # sss_format=symmetric, reorder=reorder) # elif mesh.dim == 2 and element == 'P1': # dmS.dofs_per_element = 2 # self.surface_mass = assembleMatrix(surface, dmS, mass_1d_in_2d_sym_P1(), A=M, # sss_format=symmetric, reorder=reorder) # elif mesh.dim == 2 and element == 'P2': # dmS.dofs_per_element = 3 # self.surface_mass = assembleMatrix(surface, dmS, mass_1d_in_2d_sym_P2(), A=M, # sss_format=symmetric, reorder=reorder) # dmS.mesh.cells = cellOrig # else: # raise NotImplementedError() # surface stiffness matrix if self.isLastLevel and self.params['buildSurfaceStiffness']: with self.Timer('Build surface stiffness matrix'): if self.params['depth'] > 0: surface = mesh.get_surface_mesh(INTERIOR) else: surface = mesh.get_surface_mesh(INTERIOR_NONOVERLAPPING) # ToDo: Don't just copy the sparsity pattern, this is a big waste of memory data = np.zeros((self.A.nnz), dtype=REAL) if symmetric: diagonal = np.zeros(self.A.shape[0], dtype=REAL) AS = SSS_LinearOperator(self.A.indices, self.A.indptr, data, diagonal) else: AS = CSR_LinearOperator(self.A.indices, self.A.indptr, data) assert element == 'P1', "Surface stiffness matrix only implemented for P1" dmS = P1_DoFMap(mesh, [PHYSICAL]) dmS.cells = surface.cells if mesh.dim == 2: dmS.dofs_per_element = 2 self.surfaceStiffness = assembleMatrix(surface, dmS, stiffness_1d_in_2d_sym_P1(), A=AS, sss_format=symmetric, reorder=reorder) else: raise NotImplementedError() if reorder and buildType & OVERLAPS: # build algebraic overlaps if self.meshLevel.meshOverlaps is not None: with self.Timer('Build algebraic overlaps of type \'{}\''.format(commType)): self.algebraicOverlaps = self.meshLevel.meshOverlaps.getDoFs(self.meshLevel.mesh, self.DoFMap, commType, allowInteriorBoundary=((self.params['interiorBC'] == 'homogeneousNeumann') or not self.isLastLevel)) if self.params['debugOverlaps']: self.algebraicOverlaps.check(mesh=self.meshLevel.mesh, dm=self.DoFMap, label='algebraicOverlaps in \'{} {}\''.format(self.label, self.levelNo), interfaces=self.meshLevel.meshOverlaps) elif self.meshLevel.interfaces is not None: with self.Timer('Build algebraic overlaps of type \'{}\''.format(commType)): self.algebraicOverlaps = self.meshLevel.interfaces.getDoFs(self.meshLevel.mesh, self.DoFMap, commType) if self.params['debugOverlaps']: self.algebraicOverlaps.check(mesh=self.meshLevel.mesh, dm=self.DoFMap, label='algebraicOverlaps in \'{} {}\''.format(self.label, self.levelNo), interfaces=self.meshLevel.interfaces) if reorder and (buildType & RESTRICTION_PROLONGATION) and (self.previousLevel is not None): assert (self.previousLevel.DoFMap is not None) and (self.DoFMap is not None) # use reorder here, since reorder=False bugs out (self.R, self.P) = buildRestrictionProlongation(self.previousLevel.DoFMap, self.DoFMap)
[docs] def buildCoarserMatrices(self): """ Recursively build matrices on coarser levels """ if self.previousLevel is None: return if self.S is not None and self.P is not None and self.previousLevel.S is not None and not self.previousLevel.fullyAssembled: assert self.P.shape[0] == self.S.shape[0], (self.R.shape[1], self.S.shape[0]) assert self.P.shape[1] == self.previousLevel.S.shape[0] with self.Timer('Restrict stiffness matrix'): self.P.restrictMatrix(self.S, self.previousLevel.S) if self.previousLevel.A is None: self.previousLevel.A = self.previousLevel.S if self.D is not None and self.P is not None and self.previousLevel.D is not None and not self.previousLevel.fullyAssembled: assert self.P.shape[0] == self.D.shape[0] assert self.P.shape[1] == self.previousLevel.D.shape[0] with self.Timer('Restrict drift matrix'): self.P.restrictMatrix(self.D, self.previousLevel.D) if self.M is not None and self.P is not None and self.previousLevel.M is not None and not self.previousLevel.fullyAssembled: assert self.P.shape[0] == self.M.shape[0] assert self.P.shape[1] == self.previousLevel.M.shape[0] with self.Timer('Restrict mass matrix'): self.P.restrictMatrix(self.M, self.previousLevel.M) if self.M is not None and self.A is not None and self.R is not None and self.previousLevel.A is not None and self.previousLevel.M is not None: reaction = self.params['reaction'] if isinstance(reaction, (float, REAL)): for j in range(self.previousLevel.A.data.shape[0]): self.previousLevel.A.data[j] += reaction*self.previousLevel.M.data[j] if isinstance(self.previousLevel.A, SSS_LinearOperator): for j in range(self.previousLevel.A.num_rows): self.previousLevel.A.diagonal[j] += reaction*self.previousLevel.M.diagonal[j] elif isinstance(reaction, function): dm = self.previousLevel.DoFMap c = dm.interpolate(reaction) for k in range(dm.num_dofs): for j in range(self.previousLevel.A.indptr[k], self.previousLevel.A.indptr[k+1]): self.previousLevel.A.data[j] += c[k]*self.previousLevel.M.data[j] if isinstance(self.previousLevel.A, SSS_LinearOperator): for k in range(self.previousLevel.A.num_rows): self.previousLevel.A.diagonal[k] += c[k]*self.previousLevel.M.diagonal[k] elif reaction is None: pass else: raise NotImplementedError() if self.previousLevel is not None: self.previousLevel.fullyAssembled = True self.previousLevel.buildCoarserMatrices()
[docs] @classmethod def getKeys(cls): return algebraicLevelBase.getKeys() + ['A', 'S', 'D', 'M', 'surface_mass', 'surface_stiffness']
[docs] def getLevelDict(self): lvl = super(algebraicLevel, self).getLevelDict() if hasattr(self, ' neumannA'): lvl['neumannA'] = self.neumannA return lvl
[docs] def getGlobalA(self, doDistribute=False, keepDistributedResult=False): if self.A is not None: if self.algebraicOverlaps is not None: if isinstance(self.A, CSR_LinearOperator): return CSR_DistributedLinearOperator(self.A, self.algebraicOverlaps, doDistribute=doDistribute, keepDistributedResult=keepDistributedResult) else: return DistributedLinearOperator(self.A, self.algebraicOverlaps, doDistribute=doDistribute, keepDistributedResult=keepDistributedResult) else: return self.A else: return None