Source code for nutils.topology

# Copyright (c) 2014 Evalf
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

"""
The topology module defines the topology objects, notably the
:class:`StructuredTopology` and :class:`UnstructuredTopology`. Maintaining
strict separation of topological and geometrical information, the topology
represents a set of elements and their interconnectivity, boundaries,
refinements, subtopologies etc, but not their positioning in physical space. The
dimension of the topology represents the dimension of its elements, not that of
the the space they are embedded in.

The primary role of topologies is to form a domain for :mod:`nutils.function`
objects, like the geometry function and function bases for analysis, as well as
provide tools for their construction. It also offers methods for integration and
sampling, thus providing a high level interface to operations otherwise written
out in element loops. For lower level operations topologies can be used as
:mod:`nutils.element` iterators.
"""

from . import element, function, util, numpy, parallel, matrix, log, config, numeric, cache, transform, warnings, _
import functools, collections.abc, itertools, functools, operator

_identity = lambda x: x

[docs]class Topology: 'topology base class' # subclass needs to implement: .elements def __init__(self, ndims): 'constructor' assert numeric.isint(ndims) and ndims >= 0 self.ndims = ndims def __str__(self): 'string representation' return '{}(#{})'.format(self.__class__.__name__, len(self)) def __len__(self): return len(self.elements) def __iter__(self): return iter(self.elements) def getitem(self, item): return EmptyTopology(self.ndims) def __getitem__(self, item): if not isinstance(item, tuple): item = item, if all(it in (...,slice(None)) for it in item): return self topo = self.getitem(item) if len(item) != 1 or not isinstance(item[0],str) \ else functools.reduce(operator.or_, map(self.getitem, item[0].split(',')), EmptyTopology(self.ndims)) if not topo: raise KeyError(item) return topo def __invert__(self): return OppositeTopology(self) def __or__(self, other): assert isinstance(other, Topology) and other.ndims == self.ndims return other if not self \ else self if not other \ else NotImplemented if isinstance(other, UnionTopology) \ else UnionTopology((self,other)) __ror__ = lambda self, other: self.__or__(other) def __and__(self, other): # Strategy: loop over combined elements sorted by .transform while keeping # track of the origin (mine=True for self, mine=False for other), and # select an element if it is equal to or a refinement of the previous # (hold) element and it originates from the other topology (mine == need). # Hold is not updated in case of a match because it might match multiple # children. elems = [] need = None for elem, mine in sorted([(elem, True) for elem in self] + [(elem, False) for elem in other], key=lambda v: v[0].transform): if mine == need and elem.transform[:len(hold.transform)] == hold.transform: assert elem.opposite[:len(hold.opposite)] == hold.opposite elems.append(elem) else: hold = elem need = not mine return UnstructuredTopology(self.ndims, elems) __rand__ = lambda self, other: self.__and__(other) def __add__(self, other): return self | other def __contains__(self, element): ielem = self.edict.get(element.transform) return ielem is not None and self.elements[ielem] == element def __sub__(self, other): assert isinstance(other, Topology) and other.ndims == self.ndims return other.__rsub__(self) def __rsub__(self, other): assert isinstance(other, Topology) and other.ndims == self.ndims return other - other.subset(self, newboundary=getattr(self,'boundary',None)) def __mul__(self, other): return ProductTopology(self, other) @cache.property def edict(self): '''transform -> ielement mapping''' return {elem.transform: ielem for ielem, elem in enumerate(self)} @cache.property def border_transforms(self): border_transforms = set() for belem in self.boundary: try: ielem, tail = transform.lookup_item(belem.transform, self.edict) except KeyError: pass else: border_transforms.add(self.elements[ielem].transform) return border_transforms @property def refine_iter(self): topo = self for irefine in log.count('refinement level'): yield topo topo = topo.refined stdfunc = lambda self, *args, **kwargs: self.basis('std', *args, **kwargs) linearfunc = lambda self, *args, **kwargs: self.basis('std', degree=1, *args, **kwargs) splinefunc = lambda self, *args, **kwargs: self.basis('spline', *args, **kwargs) bubblefunc = lambda self, *args, **kwargs: self.basis('bubble', *args, **kwargs) discontfunc = lambda self, *args, **kwargs: self.basis('discont', *args, **kwargs) def basis(self, name, *args, **kwargs): if self.ndims == 0: return function.asarray([1]) f = getattr(self, 'basis_' + name) return f(*args, **kwargs)
[docs] @log.title @util.single_or_multiple def elem_eval(self, funcs, ischeme, separate=False, geometry=None, asfunction=False, edit=_identity, *, arguments=None): 'element-wise evaluation' fcache = cache.WrapperCache() assert not separate or not asfunction, '"separate" and "asfunction" are mutually exclusive' if geometry: iwscale = function.J(geometry, self.ndims) npoints = len(self) slices = range(npoints) else: iwscale = 1 slices = [] npoints = 0 for elem in log.iter('elem', self): ipoints, iweights = ischeme[elem] if isinstance(ischeme,collections.abc.Mapping) else fcache[elem.reference.getischeme](ischeme) np = len(ipoints) slices.append(slice(npoints,npoints+np)) npoints += np nprocs = min(config.nprocs, len(self)) zeros = parallel.shzeros if nprocs > 1 else numpy.zeros retvals = [] idata = [] for ifunc, func in enumerate(funcs): func = function.asarray(edit(func * iwscale)) func = function.zero_argument_derivatives(func) retval = zeros((npoints,)+func.shape, dtype=func.dtype) idata.extend(function.Tuple([ifunc, function.Tuple(ind), f.simplified]) for ind, f in function.blocks(func)) retvals.append(retval) idata = function.Tuple(idata) if config.dot: idata.graphviz() if arguments is None: arguments = {} for ielem, elem in parallel.pariter(log.enumerate('elem', self), nprocs=nprocs): ipoints, iweights = ischeme[elem] if isinstance(ischeme,collections.abc.Mapping) else fcache[elem.reference.getischeme](ischeme) s = slices[ielem], try: for ifunc, index, data in idata.eval(_transforms=(elem.transform, elem.opposite), _points=ipoints, _cache=fcache, **arguments): numpy.add.at(retvals[ifunc], s+numpy.ix_(*[ind for (ind,) in index]), numeric.dot(iweights,data) if geometry else data) except function.EvaluationError: warnings.warn('not all functions evaluated successfully') for ifunc, indexfunc, datafunc in idata: index = indexfunc.eval(_transforms=(elem.transform, elem.opposite), _points=ipoints, _cache=fcache, **arguments) try: data = datafunc.eval(_transforms=(elem.transform, elem.opposite), _points=ipoints, _cache=fcache, **arguments) except function.EvaluationError: retvals[ifunc][s+numpy.ix_(*[ind for (ind,) in index])] = numpy.nan else: numpy.add.at(retvals[ifunc], s+numpy.ix_(*[ind for (ind,) in index]), numeric.dot(iweights,data) if geometry else data) log.debug('cache', fcache.stats) log.info('created', ', '.join('{}({})'.format(retval.__class__.__name__, ','.join(str(n) for n in retval.shape)) for retval in retvals)) if asfunction: if geometry: retvals = [function.elemwise({elem.transform: value for elem, value in zip(self, retval)}, shape=retval.shape[1:]) for retval in retvals] else: tsp = [(elem.transform, s, fcache[elem.reference.getischeme](ischeme)[0]) for elem, s in zip(self, slices)] retvals = [function.sampled({trans: (numeric.const(retval[s], copy=False), points) for trans, s, points in tsp}, self.ndims) for retval in retvals] elif separate: retvals = [[retval[s] for s in slices] for retval in retvals] return retvals
[docs] @log.title @util.single_or_multiple def elem_mean(self, funcs, geometry, ischeme, *, arguments=None): 'element-wise average' retvals = self.elem_eval((1,)+funcs, geometry=geometry, ischeme=ischeme, arguments=arguments) return [v / retvals[0][(slice(None),)+(_,)*(v.ndim-1)] for v in retvals[1:]]
def _integrate(self, funcs, ischeme, fcache=None, arguments=None): if arguments is None: arguments = {} # Functions may consist of several blocks, such as originating from # chaining. Here we make a list of all blocks consisting of triplets of # argument id, evaluable index, and evaluable values. blocks = [(ifunc, function.Tuple(ind), f.simplified) for ifunc, func in enumerate(funcs) for ind, f in function.blocks(function.zero_argument_derivatives(func))] block2func, indices, values = zip(*blocks) if blocks else ([],[],[]) log.debug('integrating {} distinct blocks'.format('+'.join( str(block2func.count(ifunc)) for ifunc in range(len(funcs))))) if config.dot: function.Tuple(values).graphviz() if fcache is None: fcache = cache.WrapperCache() # To allocate (shared) memory for all block data we evaluate indexfunc to # build an nblocks x nelems+1 offset array, and nblocks index lists of # length nelems. offsets = numpy.zeros((len(blocks), len(self)+1), dtype=int) if blocks: sizefunc = function.stack([f.size for ifunc, ind, f in blocks]).simplified for ielem, elem in enumerate(self): n, = sizefunc.eval(_transforms=(elem.transform, elem.opposite), _cache=fcache, **arguments) offsets[:,ielem+1] = offsets[:,ielem] + n # Since several blocks may belong to the same function, we post process the # offsets to form consecutive intervals in longer arrays. The length of # these arrays is captured in the nfuncs-array nvals. nvals = numpy.zeros(len(funcs), dtype=int) for iblock, ifunc in enumerate(block2func): offsets[iblock] += nvals[ifunc] nvals[ifunc] = offsets[iblock,-1] # The data_index list contains shared memory index and value arrays for # each function argument. nprocs = min(config.nprocs, len(self)) empty = parallel.shempty if nprocs > 1 else numpy.empty data_index = [ (empty(n, dtype=float), empty((funcs[ifunc].ndim,n), dtype=int)) for ifunc, n in enumerate(nvals) ] # In a second, parallel element loop, valuefunc is evaluated to fill the # data part of data_index using the offsets array for location. Each # element has its own location so no locks are required. The index part of # data_index is filled in the same loop. It does not use valuefunc data but # benefits from parallel speedup. valueindexfunc = function.Tuple(function.Tuple([value]+list(index)) for value, index in zip(values, indices)) for ielem, elem in parallel.pariter(log.enumerate('elem', self), nprocs=nprocs): ipoints, iweights = ischeme[elem] if isinstance(ischeme,collections.abc.Mapping) else fcache[elem.reference.getischeme](ischeme) assert iweights is not None, 'no integration weights found' for iblock, (intdata, *indices) in enumerate(valueindexfunc.eval(_transforms=(elem.transform, elem.opposite), _points=ipoints, _cache=fcache, **arguments)): s = slice(*offsets[iblock,ielem:ielem+2]) data, index = data_index[block2func[iblock]] w_intdata = numeric.dot(iweights, intdata) data[s] = w_intdata.ravel() si = (slice(None),) + (_,) * (w_intdata.ndim-1) for idim, (ii,) in enumerate(indices): index[idim,s].reshape(w_intdata.shape)[...] = ii[si] si = si[:-1] log.debug('cache', fcache.stats) return data_index
[docs] @log.title @util.single_or_multiple def integrate(self, funcs, ischeme='gauss', degree=None, geometry=None, force_dense=False, fcache=None, edit=_identity, *, arguments=None): 'integrate' if degree is not None: ischeme += str(degree) iwscale = function.J(geometry, self.ndims) if geometry else 1 integrands = [function.asarray(edit(func * iwscale)) for func in funcs] data_index = self._integrate(integrands, ischeme, fcache, arguments) return [matrix.assemble(data, index, integrand.shape, force_dense) for integrand, (data,index) in zip(integrands, data_index)]
[docs] @log.title def integral(self, func, ischeme='gauss', degree=None, geometry=None, edit=_identity): 'integral' if degree is not None: ischeme += str(degree) iwscale = function.J(geometry, self.ndims) if geometry else 1 integrand = edit(func * iwscale) from . import solver return solver.Integral([((self, ischeme), integrand)])
[docs] def projection(self, fun, onto, geometry, **kwargs): 'project and return as function' weights = self.project(fun, onto, geometry, **kwargs) return onto.dot(weights)
[docs] @log.title def project(self, fun, onto, geometry, tol=0, ischeme='gauss', degree=None, droptol=1e-12, exact_boundaries=False, constrain=None, verify=None, ptype='lsqr', precon='diag', edit=_identity, *, arguments=None, **solverargs): 'L2 projection of function onto function space' log.debug('projection type:', ptype) if degree is not None: ischeme += str(degree) if constrain is None: constrain = util.NanVec(onto.shape[0]) else: constrain = constrain.copy() if exact_boundaries: constrain |= self.boundary.project(fun, onto, geometry, constrain=constrain, title='boundaries', ischeme=ischeme, tol=tol, droptol=droptol, ptype=ptype, edit=edit, arguments=arguments) assert isinstance(constrain, util.NanVec) assert constrain.shape == onto.shape[:1] avg_error = None # setting this depends on projection type if ptype == 'lsqr': assert ischeme is not None, 'please specify an integration scheme for lsqr-projection' fun2 = function.asarray(fun)**2 if len(onto.shape) == 1: Afun = function.outer(onto) bfun = onto * fun elif len(onto.shape) == 2: Afun = function.outer(onto).sum(2) bfun = function.sum(onto * fun, -1) if fun2.ndim: fun2 = fun2.sum(-1) else: raise Exception assert fun2.ndim == 0 A, b, f2, area = self.integrate([Afun,bfun,fun2,1], geometry=geometry, ischeme=ischeme, edit=edit, arguments=arguments, title='building system') N = A.rowsupp(droptol) if numpy.equal(b, 0).all(): constrain[~constrain.where&N] = 0 avg_error = 0. else: solvecons = constrain.copy() solvecons[~(constrain.where|N)] = 0 u = A.solve(b, solvecons, tol=tol, symmetric=True, precon=precon, **solverargs) constrain[N] = u[N] err2 = f2 - numpy.dot(2*b-A.matvec(u), u) # can be negative ~zero due to rounding errors avg_error = numpy.sqrt(err2) / area if err2 > 0 else 0 elif ptype == 'convolute': assert ischeme is not None, 'please specify an integration scheme for convolute-projection' if len(onto.shape) == 1: ufun = onto * fun afun = onto elif len(onto.shape) == 2: ufun = function.sum(onto * fun, axis=-1) afun = function.norm2(onto) else: raise Exception u, scale = self.integrate([ufun, afun], geometry=geometry, ischeme=ischeme, edit=edit, arguments=arguments) N = ~constrain.where & (scale > droptol) constrain[N] = u[N] / scale[N] elif ptype == 'nodal': ## data = function.Tuple([fun, onto]) ## F = W = 0 ## for elem in self: ## f, w = data(elem, 'bezier2') ## W += w.sum(axis=-1).sum(axis=0) ## F += numeric.contract(f[:,_,:], w, axis=[0,2]) ## I = (W!=0) F = numpy.zeros(onto.shape[0]) W = numpy.zeros(onto.shape[0]) I = numpy.zeros(onto.shape[0], dtype=bool) fun = function.zero_argument_derivatives(function.asarray(fun)) data = function.Tuple(function.Tuple([fun, onto_f.simplified, function.Tuple(onto_ind)]) for onto_ind, onto_f in function.blocks(function.zero_argument_derivatives(onto))) for elem in self: ipoints, iweights = elem.getischeme('bezier2') for fun_, onto_f_, onto_ind_ in data.eval(_transforms=(elem.transform, elem.opposite), _points=ipoints, **arguments or {}): onto_f_ = onto_f_.swapaxes(0,1) # -> dof axis, point axis, ... indfun_ = fun_[(slice(None),)+numpy.ix_(*onto_ind_[1:])] assert onto_f_.shape[0] == len(onto_ind_[0]) assert onto_f_.shape[1:] == indfun_.shape W[onto_ind_[0]] += onto_f_.reshape(onto_f_.shape[0],-1).sum(1) F[onto_ind_[0]] += (onto_f_ * indfun_).reshape(onto_f_.shape[0],-1).sum(1) I[onto_ind_[0]] = True I[constrain.where] = False constrain[I] = F[I] / W[I] else: raise Exception('invalid projection {!r}'.format(ptype)) numcons = constrain.where.sum() info = 'constrained {}/{} dofs'.format(numcons, constrain.size) if avg_error is not None: info += ', error {:.2e}/area'.format(avg_error) log.info(info) if verify is not None: assert numcons == verify, 'number of constraints does not meet expectation: {} != {}'.format(numcons, verify) return constrain
@cache.property def simplex(self): simplices = [simplex for elem in self for simplex in elem.simplices] return UnstructuredTopology(self.ndims, simplices)
[docs] def refined_by(self, refine): 'create refined space by refining dofs in existing one' refine = set(item.transform if isinstance(item,element.Element) else item for item in refine) refined = [] for elem in self: if elem.transform in refine: refined.extend(elem.children) else: refined.append(elem) return self.hierarchical(refined, precise=True)
def hierarchical(self, refined, precise=False): return HierarchicalTopology(self, refined, precise) @property def refined(self): return RefinedTopology(self)
[docs] def refine(self, n): 'refine entire topology n times' if numpy.iterable(n): assert len(n) == self.ndims assert all(ni == n[0] for ni in n) n = n[0] return self if n <= 0 else self.refined.refine(n-1)
[docs] @log.title def trim(self, levelset, maxrefine, ndivisions=8, name='trimmed', leveltopo=None, *, arguments=None): 'trim element along levelset' if arguments is None: arguments = {} fcache = cache.WrapperCache() levelset = function.zero_argument_derivatives(levelset).simplified if leveltopo is None: ischeme = 'vertex{}'.format(maxrefine) refs = [elem.reference.trim(levelset.eval(_transforms=(elem.transform, elem.opposite), _points=fcache[elem.reference.getischeme](ischeme)[0], _cache=fcache, **arguments), maxrefine=maxrefine, ndivisions=ndivisions) for elem in log.iter('elem', self)] else: log.info('collecting leveltopo elements') bins = [[] for ielem in range(len(self))] for elem in leveltopo: ielem, tail = transform.lookup_item(elem.transform, self.edict) bins[ielem].append(tail) refs = [] for elem, ctransforms in log.zip('elem', self, bins): levels = numpy.empty(elem.reference.nvertices_by_level(maxrefine)) cover = list(fcache[elem.reference.vertex_cover](tuple(sorted(ctransforms)), maxrefine)) # confirm cover and greedily optimize order mask = numpy.ones(len(levels), dtype=bool) while mask.any(): imax = numpy.argmax([mask[indices].sum() for trans, points, indices in cover]) trans, points, indices = cover.pop(imax) levels[indices] = levelset.eval(_transforms=(elem.transform + trans,), _points=points, _cache=fcache, **arguments) mask[indices] = False refs.append(elem.reference.trim(levels, maxrefine=maxrefine, ndivisions=ndivisions)) log.debug('cache', fcache.stats) return SubsetTopology(self, refs, newboundary=name)
[docs] def subset(self, elements, newboundary=None, strict=False): 'intersection' refs = [element.EmptyReference(self.ndims)] * len(self) for elem in elements: try: ielem = self.edict[elem.transform] except KeyError: assert not strict, 'elements do not form a strict subset' else: ref = self.elements[ielem].reference & elem.reference if strict: assert ref == elem.reference, 'elements do not form a strict subset' refs[ielem] = ref if not any(refs): return EmptyTopology(self.ndims) return SubsetTopology(self, refs, newboundary)
def withgroups(self, vgroups={}, bgroups={}, igroups={}, pgroups={}): return WithGroupsTopology(self, vgroups, bgroups, igroups, pgroups) if vgroups or bgroups or igroups or pgroups else self withsubdomain = lambda self, **kwargs: self.withgroups(vgroups=kwargs) withboundary = lambda self, **kwargs: self.withgroups(bgroups=kwargs) withinterfaces = lambda self, **kwargs: self.withgroups(igroups=kwargs) withpoints = lambda self, **kwargs: self.withgroups(pgroups=kwargs) @log.title @util.single_or_multiple def elem_project(self, funcs, degree, ischeme=None, check_exact=False, *, arguments=None): if arguments is None: arguments = {} if ischeme is None: ischeme = 'gauss{}'.format(degree*2) blocks = function.Tuple([function.Tuple([function.Tuple((function.Tuple(ind), f.simplified)) for ind, f in function.blocks(function.zero_argument_derivatives(func))]) for func in funcs]) bases = {} extractions = [[] for ifunc in range(len(funcs))] for elem in log.iter('elem', self): try: points, projector, basis = bases[elem.reference] except KeyError: points, weights = elem.reference.getischeme(ischeme) coeffs = elem.reference.get_poly_coeffs('bernstein', degree=degree) basis = numeric.poly_eval(coeffs[_], points) npoints, nfuncs = basis.shape A = numeric.dot(weights, basis[:,:,_] * basis[:,_,:]) projector = numpy.linalg.solve(A, basis.T * weights) bases[elem.reference] = points, projector, basis for ifunc, ind_val in enumerate(blocks.eval(_transforms=(elem.transform, elem.opposite), _points=points, **arguments)): if len(ind_val) == 1: (allind, sumval), = ind_val else: allind, where = zip(*[numpy.unique([i for ind, val in ind_val for i in ind[iax]], return_inverse=True) for iax in range(funcs[ifunc].ndim)]) sumval = numpy.zeros([len(n) for n in (points,) + allind]) for ind, val in ind_val: I, where = zip(*[(w[:len(n)], w[len(n):]) for w, n in zip(where, ind)]) numpy.add.at(sumval, numpy.ix_(range(len(points)), *I), val) assert not any(where) ex = numeric.dot(projector, sumval) if check_exact: numpy.testing.assert_almost_equal(sumval, numeric.dot(basis, ex), decimal=15) extractions[ifunc].append((allind, ex)) return extractions @log.title def volume(self, geometry, ischeme='gauss', degree=1, *, arguments=None): return self.integrate(1, geometry=geometry, ischeme=ischeme, degree=degree, arguments=arguments) @log.title def check_boundary(self, geometry, elemwise=False, ischeme='gauss', degree=1, tol=1e-15, print=print, *, arguments=None): if elemwise: for elem in self: elem.reference.check_edges(tol=tol, print=print) volume = self.volume(geometry, ischeme=ischeme, degree=degree, arguments=arguments) zeros, volumes = self.boundary.integrate([geometry.normal(), geometry * geometry.normal()], geometry=geometry, ischeme=ischeme, degree=degree, arguments=arguments) if numpy.greater(abs(zeros), tol).any(): print('divergence check failed: {} != 0'.format(zeros)) if numpy.greater(abs(volumes - volume), tol).any(): print('divergence check failed: {} != {}'.format(volumes, volume)) def volume_check(self, geometry, ischeme='gauss', degree=1, decimal=15, *, arguments=None): warnings.deprecation('volume_check will be removed in future, us check_boundary instead') self.check_boundary(geometry=geometry, ischeme=ischeme, degree=degree, tol=10**-decimal, arguments=arguments) def indicator(self, subtopo): if isinstance(subtopo, str): subtopo = self[subtopo] transforms = tuple(sorted(elem.transform for elem in self)) values = numeric.const([int(trans in subtopo.edict) for trans in transforms]) assert len(subtopo) == values.sum(0), '{} is not a proper subtopology of {}'.format(subtopo, self) return function.Get(values, axis=0, item=function.FindTransform(transforms, function.Promote(self.ndims, trans=function.TRANS))) def select(self, indicator, ischeme='bezier2', *, arguments=None): values = self.elem_eval(indicator, ischeme, separate=True, arguments=arguments) selected = [elem for elem, value in zip(self, values) if numpy.greater(value, 0).any()] return UnstructuredTopology(self.ndims, selected) def prune_basis(self, basis): used = numpy.zeros(len(basis), dtype=bool) for axes, func in function.blocks(basis): dofmap = axes[0] for elem in self: dofs = dofmap.eval(_transforms=(elem.transform, elem.opposite)) used[dofs] = True return function.mask(basis, used) def locate(self, geom, points, ischeme='vertex', scale=1, tol=1e-12, eps=0, maxiter=100, *, arguments=None): nprocs = min(config.nprocs, len(self)) if arguments is None: arguments = {} if geom.ndim == 0: geom = geom[_] points = points[...,_] assert geom.shape == (self.ndims,) points = numpy.asarray(points, dtype=float) assert points.ndim == 2 and points.shape[1] == self.ndims vertices = self.elem_eval(geom, ischeme=ischeme, separate=True, arguments=arguments) bboxes = numpy.array([numpy.mean(v,axis=0) * (1-scale) + numpy.array([numpy.min(v,axis=0), numpy.max(v,axis=0)]) * scale for v in vertices]) # nelems x {min,max} x ndims vref = element.getsimplex(0) ielems = parallel.shempty(len(points), dtype=int) xis = parallel.shempty((len(points),len(geom)), dtype=float) for ipoint, point in parallel.pariter(log.enumerate('point', points), nprocs=nprocs): ielemcandidates, = numpy.logical_and(numpy.greater_equal(point, bboxes[:,0,:]), numpy.less_equal(point, bboxes[:,1,:])).all(axis=-1).nonzero() for ielem in sorted(ielemcandidates, key=lambda i: numpy.linalg.norm(bboxes[i].mean(0)-point)): converged = False elem = self.elements[ielem] xi, w = elem.reference.getischeme('gauss1') xi = (numpy.dot(w,xi) / w.sum())[_] if len(xi) > 1 else xi.copy() J = function.localgradient(geom, self.ndims) geom_J = function.Tuple((function.zero_argument_derivatives(geom), function.zero_argument_derivatives(J))).simplified for iiter in range(maxiter): point_xi, J_xi = geom_J.eval(_transforms=(elem.transform, elem.opposite), _points=xi, **arguments) err = numpy.linalg.norm(point - point_xi) if err < tol: converged = True break if iiter and err > prev_err: break prev_err = err xi += numpy.linalg.solve(J_xi, point - point_xi) if converged and elem.reference.inside(xi[0], eps=eps): ielems[ipoint] = ielem xis[ipoint], = xi break else: raise LocateError('failed to locate point: {}'.format(point)) pelems = [] for ielem, xi in zip(ielems, xis): elem = self.elements[ielem] trans = transform.Matrix(numpy.empty((len(xi), 0)), xi) pelems.append(element.Element(vref, elem.transform + (trans,), elem.opposite and elem.opposite + (trans,), oriented=True)) return UnstructuredTopology(0, pelems) def supp(self, basis, mask=None): if mask is None: mask = numpy.ones(len(basis), dtype=bool) elif isinstance(mask, list) or numeric.isarray(mask) and mask.dtype == int: tmp = numpy.zeros(len(basis), dtype=bool) tmp[mask] = True mask = tmp else: assert numeric.isarray(mask) and mask.dtype == bool and mask.shape == basis.shape[:1] indfunc = function.Tuple([ind[0] for ind, f in function.blocks(basis)]) subset = [] for elem in self: try: ind, = numpy.concatenate(indfunc.eval(_transforms=(elem.transform, elem.opposite)), axis=1) except function.EvaluationError: pass else: if mask[ind].any(): subset.append(elem) if not subset: return EmptyTopology(self.ndims) return self.subset(subset, newboundary='supp', strict=True) def revolved(self, geom): assert geom.ndim == 1 revdomain = self * RevolutionTopology() angle, = function.rootcoords(1) geom, angle = function.bifurcate(geom, angle) revgeom = function.concatenate([geom[0] * function.trignormal(angle), geom[1:]]) simplify = cache.replace(lambda op: function.zeros(()) if op is angle else None) return revdomain, revgeom, simplify def extruded(self, geom, nelems, periodic=False, bnames=('front','back')): assert geom.ndim == 1 root = transform.RootTrans('extrude', shape=[nelems if periodic else 0]) extopo = self * StructuredLine(root, i=0, j=nelems, periodic=periodic, bnames=bnames) exgeom = function.concatenate(function.bifurcate(geom, function.rootcoords(1))) return extopo, exgeom
class LocateError(Exception): pass
[docs]class WithGroupsTopology(Topology): 'item topology' def __init__(self, basetopo, vgroups={}, bgroups={}, igroups={}, pgroups={}): assert vgroups or bgroups or igroups or pgroups self.basetopo = basetopo self.vgroups = vgroups.copy() self.bgroups = bgroups.copy() self.igroups = igroups.copy() self.pgroups = pgroups.copy() super().__init__(basetopo.ndims) assert all(topo is Ellipsis or isinstance(topo, str) or isinstance(topo, Topology) and topo.ndims == basetopo.ndims and set(self.basetopo.edict).issuperset(topo.edict) for topo in self.vgroups.values()) def withgroups(self, vgroups={}, bgroups={}, igroups={}, pgroups={}): args = [] for groups, newgroups in (self.vgroups,vgroups), (self.bgroups,bgroups), (self.igroups,igroups), (self.pgroups,pgroups): groups = groups.copy() groups.update(newgroups) args.append(groups) return WithGroupsTopology(self.basetopo, *args) def __iter__(self): return iter(self.basetopo) def __len__(self): return len(self.basetopo) def getitem(self, item): if not isinstance(item, str): return self.basetopo.getitem(item) try: itemtopo = self.vgroups[item] except KeyError: return self.basetopo.getitem(item) else: return itemtopo if isinstance(itemtopo, Topology) else self.basetopo[itemtopo] @property def edict(self): return self.basetopo.edict @property def border_transforms(self): return self.basetopo.border_transforms @property def connectivity(self): return self.basetopo.connectivity @property def structure(self): return self.basetopo.structure @property def elements(self): return self.basetopo.elements @property def boundary(self): return self.basetopo.boundary.withgroups(self.bgroups) @property def interfaces(self): baseitopo = self.basetopo.interfaces # last minute orientation fix igroups = {name: UnstructuredTopology(self.ndims-1, [elem if elem.transform in baseitopo.edict else elem.flipped for elem in elems]) for name, elems in self.igroups.items()} return baseitopo.withgroups(igroups) @property def points(self): return self.basetopo.points.withgroups(self.pgroups) def basis(self, name, *args, **kwargs): return self.basetopo.basis(name, *args, **kwargs) @cache.property def refined(self): groups = [{name: topo.refined if isinstance(topo,Topology) else topo for name, topo in groups.items()} for groups in (self.vgroups,self.bgroups,self.igroups,self.pgroups)] return self.basetopo.refined.withgroups(*groups)
[docs]class OppositeTopology(Topology): 'opposite topology' def __init__(self, basetopo): self.basetopo = basetopo super().__init__(basetopo.ndims) def getitem(self, item): return ~(self.basetopo.getitem(item)) def __iter__(self): return (elem.flipped for elem in self.basetopo) def __len__(self): return len(self.basetopo) @cache.property def elements(self): return tuple(self) def __invert__(self): return self.basetopo
[docs]class EmptyTopology(Topology): 'empty topology' def __iter__(self): return iter([]) def __len__(self): return 0 def __or__(self, other): assert self.ndims == other.ndims return other def __rsub__(self, other): return other @property def elements(self): return ()
[docs]class Point(Topology): 'point' def __init__(self, trans, opposite=None): assert trans[-1].fromdims == 0 self.elem = element.Element(element.getsimplex(0), trans, opposite, oriented=True) super().__init__(ndims=0) def __iter__(self): yield self.elem @property def elements(self): return self.elem,
[docs]class StructuredLine(Topology): 'structured topology' def __init__(self, root, i, j, periodic=False, bnames=None): 'constructor' assert isinstance(i,int) and isinstance(j,int) and j > i assert not bnames or len(bnames) == 2 and all(isinstance(bname,str) for bname in bnames) assert isinstance(root, transform.TransformItem) self.root = root self.i = i self.j = j self.periodic = periodic self.bnames = bnames or () super().__init__(ndims=1) @cache.property def _transforms(self): # one extra left and right for opposites, even if periodic=True return tuple((self.root, transform.Shift([float(offset)])) for offset in range(self.i-1, self.j+1)) def __iter__(self): reference = element.getsimplex(1) return (element.Element(reference, trans) for trans in self._transforms[1:-1]) def __len__(self): return self.j - self.i @cache.property def elements(self): return tuple(self) @cache.property def boundary(self): if self.periodic: return EmptyTopology(ndims=0) transforms = self._transforms right, left = element.LineReference().edge_transforms bnd = Point(transforms[1] + (left,), transforms[0] + (right,)), Point(transforms[-2] + (right,), transforms[-1] + (left,)) return UnionTopology(bnd, self.bnames) @cache.property def interfaces(self): transforms = self._transforms right, left = element.LineReference().edge_transforms points = [Point(trans + (left,), opp + (right,)) for trans, opp in zip(transforms[2:-1], transforms[1:-2])] if self.periodic: points.append(Point(transforms[1] + (left,), transforms[-2] + (right,))) return UnionTopology(points) @classmethod def _bernstein_poly(cls, degree): 'bernstein polynomial coefficients' @classmethod def _spline_coeffs(cls, p, n): 'spline polynomial coefficients' assert p >= 0, 'invalid polynomial degree {}'.format(p) if p == 0: assert n == -1 return numpy.array([[[1.]]]) assert 1 <= n < 2*p extractions = numpy.empty((n, p+1, p+1)) extractions[0] = numpy.eye(p+1) for i in range(1, n): extractions[i] = numpy.eye(p+1) for j in range(2, p+1): for k in reversed(range(j, p+1)): alpha = 1. / min(2+k-j, n-i+1) extractions[i-1,:,k] = alpha * extractions[i-1,:,k] + (1-alpha) * extractions[i-1,:,k-1] extractions[i,-j-1:-1,-j-1] = extractions[i-1,-j:,-1] # magic bernstein triangle poly = numpy.zeros([p+1,p+1], dtype=int) for k in range(p//2+1): poly[k,k] = root = (-1)**p if k == 0 else (poly[k-1,k] * (k*2-1-p)) / k for i in range(k+1,p+1-k): poly[i,k] = poly[k,i] = root = (root * (k+i-p-1)) / i poly = poly[::-1].astype(float) return numeric.const(numeric.contract(extractions[:,_,:,:], poly[_,:,_,:], axis=-1).transpose(0,2,1), copy=False)
[docs] def basis_spline(self, degree, periodic=None, removedofs=None): 'spline from vertices' if periodic is None: periodic = self.periodic if numpy.iterable(degree): degree, = degree if numpy.iterable(removedofs): removedofs, = removedofs strides = 1, 1 shape = len(self), degree+1 ndofs = sum(s*(n-1) for s, n in zip(strides, shape))+1 dofs = numpy.arange(ndofs) if periodic and degree > 0: assert ndofs >= 2 * degree dofs[-degree:] = dofs[:degree] ndofs -= degree dofs = numpy.lib.stride_tricks.as_strided(dofs, shape=shape, strides=tuple(s*dofs.strides[0] for s in strides)) dofs = numeric.const(dofs, copy=False) p = degree n = 2*p-1 nelems = len(self) if periodic: if nelems == 1: # periodicity on one element can only mean a constant coeffs = [self._spline_coeffs(0, n)] dofs = numeric.const([[0]], copy=False) else: coeffs = list(self._spline_coeffs(p, n)[p-1:p]) * nelems else: coeffs = list(self._spline_coeffs(p, min(nelems,n))) if len(coeffs) < nelems: coeffs = coeffs[:p-1] + coeffs[p-1:p] * (nelems-2*(p-1)) + coeffs[p:] coeffs = numeric.const(coeffs, copy=False) func = function.polyfunc(coeffs, dofs, ndofs, self._transforms[1:-1], issorted=False) if not removedofs: return func mask = numpy.ones(ndofs, dtype=bool) mask[list(removedofs)] = False return function.mask(func, mask)
[docs] def basis_discont(self, degree): 'discontinuous shape functions' ref = element.LineReference() coeffs = [ref.get_poly_coeffs('bernstein', degree=degree)]*len(self) ndofs = ref.get_ndofs(degree) dofs = numeric.const(numpy.arange(ndofs*len(self), dtype=int).reshape(len(self), ndofs), copy=False) return function.polyfunc(coeffs, dofs, ndofs*len(self), self._transforms[1:-1], issorted=False)
[docs] def basis_std(self, degree, periodic=None, removedofs=None): 'spline from vertices' if periodic is None: periodic = self.periodic strides = max(1, degree), 1 shape = len(self), degree+1 ndofs = sum(s*(n-1) for s, n in zip(strides, shape))+1 dofs = numpy.arange(ndofs) if periodic and degree > 0: dofs[-1] = dofs[0] ndofs -= 1 dofs = numpy.lib.stride_tricks.as_strided(dofs, shape=shape, strides=tuple(s*dofs.strides[0] for s in strides)) dofs = numeric.const(dofs, copy=False) coeffs = [element.LineReference().get_poly_coeffs('bernstein', degree=degree)]*len(self) func = function.polyfunc(coeffs, dofs, ndofs, self._transforms[1:-1], issorted=False) if not removedofs: return func mask = numpy.ones(ndofs, dtype=bool) mask[list(removedofs)] = False return function.mask(func, mask)
def __str__(self): 'string representation' return '{}({}:{})'.format(self.__class__.__name__, self.i, self.j)
[docs]class StructuredTopology(Topology): 'structured topology' def __init__(self, root, axes, nrefine=0, bnames=None): 'constructor' self.root = root self.axes = tuple(axes) self.nrefine = nrefine self.shape = tuple(axis.j - axis.i for axis in self.axes if axis.isdim) if bnames is None: assert len(self.axes) <= 3 bnames = ('left', 'right'), ('bottom', 'top'), ('front', 'back') bnames = itertools.chain.from_iterable(n for axis, n in zip(self.axes, bnames) if axis.isdim and not axis.isperiodic) self._bnames = tuple(bnames) assert len(self._bnames) == sum(2 for axis in self.axes if axis.isdim and not axis.isperiodic) assert all(isinstance(bname,str) for bname in self._bnames) super().__init__(len(self.shape)) def __iter__(self): reference = util.product(element.getsimplex(1 if axis.isdim else 0) for axis in self.axes) return (element.Element(reference, trans, opp, oriented=True) for trans, opp in zip(self._transform.flat, self._opposite.flat)) def __len__(self): return numpy.prod(self.shape, dtype=int) def getitem(self, item): if not isinstance(item, tuple): return EmptyTopology(self.ndims) assert all(isinstance(it,slice) for it in item) and len(item) <= self.ndims if all(it == slice(None) for it in item): # shortcut return self axes = [] idim = 0 for axis in self.axes: if axis.isdim and idim < len(item): s = item[idim] start, stop, stride = s.indices(axis.j - axis.i) assert stride == 1 assert stop > start if start > 0 or stop < axis.j - axis.i: axis = DimAxis(axis.i+start, axis.i+stop, isperiodic=False) idim += 1 axes.append(axis) return StructuredTopology(self.root, axes, self.nrefine, bnames=self._bnames) @cache.property def elements(self): return tuple(self) @property def periodic(self): dimaxes = (axis for axis in self.axes if axis.isdim) return tuple(idim for idim, axis in enumerate(dimaxes) if axis.isdim and axis.isperiodic) @staticmethod def mktransforms(axes, root, nrefine): assert nrefine >= 0 updim = [] rmdims = numpy.zeros(len(axes), dtype=bool) for order, side, idim in sorted((axis.ibound, axis.side, idim) for idim, axis in enumerate(axes) if not axis.isdim): ref = util.product(element.getsimplex(0 if rmdim else 1) for rmdim in rmdims) iedge = (idim - rmdims[:idim].sum()) * 2 + 1 - side updim.append(ref.edge_transforms[iedge]) rmdims[idim] = True grid = [numpy.arange(axis.i>>nrefine, ((axis.j-1)>>nrefine)+1) if axis.isdim else numpy.array([(axis.i-1 if axis.side else axis.j)>>nrefine]) for axis in axes] indices = numeric.broadcast(*numeric.ix(grid)) transforms = numeric.asobjvector([transform.Shift(numpy.array(index, dtype=float))] for index in log.iter('elem', indices, indices.size)).reshape(indices.shape) if nrefine: scales = numeric.asobjvector([trans] for trans in (element.LineReference()**len(axes)).child_transforms).reshape((2,)*len(axes)) for irefine in log.range('level', nrefine-1, -1, -1): offsets = numpy.array([r[0] for r in grid]) grid = [numpy.arange(axis.i>>irefine,((axis.j-1)>>irefine)+1) if axis.isdim else numpy.array([(axis.i-1 if axis.side else axis.j)>>irefine]) for axis in axes] A = transforms[numpy.broadcast_arrays(*numeric.ix(r//2-o for r, o in zip(grid, offsets)))] B = scales[numpy.broadcast_arrays(*numeric.ix(r%2 for r in grid))] transforms = A + B shape = tuple(axis.j - axis.i for axis in axes if axis.isdim) return numeric.asobjvector(transform.canonical([root] + trans + updim) for trans in log.iter('canonical', transforms.flat)).reshape(shape) @cache.property @log.title def _transform(self): return self.mktransforms(self.axes, self.root, self.nrefine) @cache.property @log.title def _opposite(self): nbounds = len(self.axes) - self.ndims if nbounds == 0: return self._transform axes = [BndAxis(axis.i, axis.j, axis.ibound, not axis.side) if not axis.isdim and axis.ibound==nbounds-1 else axis for axis in self.axes] return self.mktransforms(axes, self.root, self.nrefine) @property def structure(self): warnings.deprecation('topology.structure will be removed in future') reference = util.product(element.getsimplex(1 if axis.isdim else 0) for axis in self.axes) return numeric.asobjvector(element.Element(reference, trans, opp, oriented=True) for trans, opp in numpy.broadcast(self._transform, self._opposite)).reshape(self.shape) @cache.property def connectivity(self): connectivity = numpy.empty(self.shape+(self.ndims,2), dtype=int) connectivity[...] = -1 ielems = numpy.arange(len(self)).reshape(self.shape) for idim in range(self.ndims): s = (slice(None),)*idim s1 = s + (slice(1,None),) s2 = s + (slice(0,-1),) connectivity[s2+(...,idim,0)] = ielems[s1] connectivity[s1+(...,idim,1)] = ielems[s2] if idim in self.periodic: connectivity[s+(-1,...,idim,0)] = ielems[s+(0,)] connectivity[s+(0,...,idim,1)] = ielems[s+(-1,)] return connectivity.reshape(len(self), self.ndims*2) @cache.property def boundary(self): 'boundary' nbounds = len(self.axes) - self.ndims btopo = EmptyTopology(self.ndims-1) jdim = 0 for idim, axis in enumerate(self.axes): if not axis.isdim or axis.isperiodic: continue btopos = [ StructuredTopology( root=self.root, axes=self.axes[:idim] + (BndAxis(n,n if not axis.isperiodic else 0,nbounds,side),) + self.axes[idim+1:], nrefine=self.nrefine, bnames=self._bnames[:jdim*2]+self._bnames[jdim*2+2:]) for side, n in enumerate((axis.i,axis.j)) ] btopo |= UnionTopology(btopos, self._bnames[jdim*2:jdim*2+2]) jdim += 1 return btopo @cache.property def interfaces(self): 'interfaces' assert self.ndims > 0, 'zero-D topology has no interfaces' itopos = [] nbounds = len(self.axes) - self.ndims for idim, axis in enumerate(self.axes): if not axis.isdim: continue bndprops = [BndAxis(i, i, ibound=nbounds, side=True) for i in range(axis.i+1, axis.j)] if axis.isperiodic: assert axis.i == 0 bndprops.append(BndAxis(axis.j, 0, ibound=nbounds, side=True)) itopos.append(EmptyTopology(self.ndims-1) if not bndprops else UnionTopology(StructuredTopology(self.root, self.axes[:idim] + (axis,) + self.axes[idim+1:], self.nrefine) for axis in bndprops)) assert len(itopos) == self.ndims return UnionTopology(itopos, names=['dir{}'.format(idim) for idim in range(self.ndims)]) def _basis_spline(self, degree, knotvalues=None, knotmultiplicities=None, periodic=None): 'spline with structure information' if periodic is None: periodic = self.periodic if numeric.isint(degree): degree = [degree]*self.ndims assert len(degree)==self.ndims if knotvalues is None: knotvalues = [None]*self.ndims if knotmultiplicities is None: knotmultiplicities = [None]*self.ndims vertex_structure = numpy.array(0) stdelems = [] dofshape = [] slices = [] cache = {} for idim in range(self.ndims): p = degree[idim] n = self.shape[idim] isperiodic = idim in periodic k = knotvalues[idim] if k is None: #Defaults to uniform spacing k = numpy.arange(n+1) else: k = numpy.asarray(k) m = knotmultiplicities[idim] if m is None: #Defaults to open spline without internal repetitions m = numpy.array([p+1]+[1]*(n-1)+[p+1]) if not isperiodic else numpy.ones(n+1,dtype=int) else: m = numpy.array(m) #Make copy to prevent overwriting of data assert min(m)>0 and max(m)<=p+1, 'Incorrect multiplicity encountered' assert len(k)==len(m), 'Length mismatch between knots vector and knot multiplicities vector' assert len(k)==n+1, 'Knot vector size does not match the topology size' if not isperiodic: nd = sum(m)-p-1 npre = p+1-m[0] #Number of knots to be appended to front npost = p+1-m[-1] #Number of knots to be appended to rear m[0] = m[-1] = p+1 else: assert m[0]==m[-1], 'Periodic spline multiplicity expected' assert m[0]<p+1, 'Endpoint multiplicity for periodic spline should be p or smaller' nd = sum(m[:-1]) npre = npost = 0 k = numpy.concatenate([k[-p-1:-1]+k[0]-k[-1], k, k[1:1+p]-k[0]+k[-1]]) m = numpy.concatenate([m[-p-1:-1], m, m[1:1+p]]) km = numpy.array([ki for ki,mi in zip(k,m) for cnt in range(mi)],dtype=float) assert len(km)==sum(m) assert nd>0, 'No basis functions defined. Knot vector too short.' stdelems_i = [] slices_i = [] offsets = numpy.cumsum(m[:-1])-p if isperiodic: offsets = offsets[p:-p] offset0 = offsets[0]+npre for offset in offsets: start = max(offset0-offset,0) #Zero unless prepending influence stop = p+1-max(offset-offsets[-1]+npost,0) #Zero unless appending influence slices_i.append(slice(offset-offset0+start,offset-offset0+stop)) lknots = km[offset:offset+2*p] - km[offset] #Copy operation required if p: #Normalize for optimized caching lknots /= lknots[-1] key = (tuple(numeric.round(lknots*numpy.iinfo(numpy.int32).max)), p) try: coeffs = cache[key] except KeyError: coeffs = numeric.const(self._localsplinebasis(lknots, p).T, copy=False) cache[key] = coeffs stdelems_i.append(coeffs[start:stop]) stdelems.append(stdelems_i) numbers = numpy.arange(nd) if isperiodic: numbers = numpy.concatenate([numbers,numbers[:p]]) vertex_structure = vertex_structure[...,_]*nd+numbers dofshape.append(nd) slices.append(slices_i) #Cache effectivity log.debug('Local knot vector cache effectivity: {}'.format(100*(1.-len(cache)/float(sum(self.shape))))) # deduplicate stdelems and compute tensorial products `unique` with indices `index` # such that unique[index[i,j]] == poly_outer_product(stdelems[0][i], stdelems[1][j]) index = numpy.array(0) for stdelems_i in stdelems: unique_i = tuple(set(stdelems_i)) unique = unique_i if not index.ndim \ else [numeric.poly_outer_product(a, b) for a in unique for b in unique_i] index = index[...,_] * len(unique_i) + tuple(map(unique_i.index, stdelems_i)) coeffs = [unique[i] for i in index.flat] dofmap = [numeric.const(vertex_structure[S].ravel(), copy=False) for S in itertools.product(*slices)] return coeffs, dofmap, dofshape
[docs] def basis_spline(self, degree, knotvalues=None, knotmultiplicities=None, periodic=None, removedofs=None): 'spline basis' if removedofs == None: removedofs = [None] * self.ndims else: assert len(removedofs) == self.ndims coeffs, dofmap, dofshape = self._basis_spline(degree=degree, knotvalues=knotvalues, knotmultiplicities=knotmultiplicities, periodic=periodic) func = function.polyfunc(coeffs, dofmap, util.product(dofshape), (elem.transform for elem in self), issorted=False) if not any(removedofs): return func mask = numpy.ones((), dtype=bool) for idofs, ndofs in zip(removedofs, dofshape): mask = mask[...,_].repeat(ndofs, axis=-1) if idofs: mask[..., [numeric.normdim(ndofs,idof) for idof in idofs]] = False assert mask.shape == tuple(dofshape) return function.mask(func, mask.ravel())
def basis_bspline(self, *args, **kwargs): warnings.deprecation('basis "bspline" has been merged with "spline"') return self.basis_spline(*args, **kwargs) @staticmethod def _localsplinebasis (lknots, p): assert numeric.isarray(lknots), 'Local knot vector should be numpy array' assert len(lknots)==2*p, 'Expected 2*p local knots' #Based on Algorithm A2.2 Piegl and Tiller N = [None]*(p+1) N[0] = numpy.poly1d([1.]) if p > 0: assert numpy.less(lknots[:-1]-lknots[1:], numpy.spacing(1)).all(), 'Local knot vector should be non-decreasing' assert lknots[p]-lknots[p-1]>numpy.spacing(1), 'Element size should be positive' lknots = lknots.astype(float) xi = numpy.poly1d([lknots[p]-lknots[p-1],lknots[p-1]]) left = [None]*p right = [None]*p for i in range(p): left[i] = xi - lknots[p-i-1] right[i] = -xi + lknots[p+i] saved = 0. for r in range(i+1): temp = N[r]/(lknots[p+r]-lknots[p+r-i-1]) N[r] = saved+right[r]*temp saved = left[i-r]*temp N[i+1] = saved assert all(Ni.order==p for Ni in N) return numeric.const([Ni.coeffs for Ni in N]).T[::-1]
[docs] def basis_discont(self, degree): 'discontinuous shape functions' ref = util.product([element.LineReference()]*self.ndims) coeffs = [ref.get_poly_coeffs('bernstein', degree=degree)]*len(self) ndofs = ref.get_ndofs(degree) dofs = numeric.const(numpy.arange(ndofs*len(self), dtype=int).reshape(len(self), ndofs), copy=False) return function.polyfunc(coeffs, dofs, ndofs*len(self), (elem.transform for elem in self), issorted=False)
[docs] def basis_std(self, degree, removedofs=None, periodic=None): 'spline from vertices' if periodic is None: periodic = self.periodic if numeric.isint(degree): degree = (degree,) * self.ndims if removedofs == None: removedofs = [None] * self.ndims else: assert len(removedofs) == self.ndims dofshape = [] slices = [] vertex_structure = numpy.array(0) for idim in range(self.ndims): periodic_i = idim in periodic n = self.shape[idim] p = degree[idim] nd = n * p + 1 numbers = numpy.arange(nd) if periodic_i and p > 0: numbers[-1] = numbers[0] nd -= 1 vertex_structure = vertex_structure[...,_] * nd + numbers dofshape.append(nd) slices.append([slice(p*i,p*i+p+1) for i in range(n)]) lineref = element.LineReference() coeffs = [functools.reduce(numeric.poly_outer_product, (lineref.get_poly_coeffs('bernstein', degree=p) for p in degree))]*len(self) dofs = [numeric.const(vertex_structure[S].ravel(), copy=False) for S in numpy.broadcast(*numpy.ix_(*slices))] func = function.polyfunc(coeffs, dofs, numpy.product(dofshape), self._transform.ravel(), issorted=False) if not any(removedofs): return func mask = numpy.ones((), dtype=bool) for idofs, ndofs in zip(removedofs, dofshape): mask = mask[...,_].repeat(ndofs, axis=-1) if idofs: mask[..., [numeric.normdim(ndofs,idof) for idof in idofs]] = False assert mask.shape == tuple(dofshape) return function.mask(func, mask.ravel())
@property def refined(self): 'refine non-uniformly' axes = [DimAxis(i=axis.i*2,j=axis.j*2,isperiodic=axis.isperiodic) if axis.isdim else BndAxis(i=axis.i*2,j=axis.j*2,ibound=axis.ibound,side=axis.side) for axis in self.axes] return StructuredTopology(self.root, axes, self.nrefine+1, bnames=self._bnames) def __str__(self): 'string representation' return '{}({})'.format(self.__class__.__name__, 'x'.join(str(n) for n in self.shape))
[docs]class UnstructuredTopology(Topology): 'unstructured topology' def __init__(self, ndims, elements): self.elements = tuple(elements) assert all(elem.ndims == ndims for elem in self.elements) super().__init__(ndims) @cache.property @log.title def connectivity(self): edges = {} connectivity = [] for ielem, elem in log.enumerate('elem', self): connectivity.append(-numpy.ones(elem.nedges, dtype=int)) for iedge, belem in enumerate(elem.edges): belemcoords = belem.vertices edgekey = tuple(sorted(belemcoords)) try: jelem, jedge = edges.pop(edgekey) except KeyError: edges[edgekey] = ielem, iedge else: # TODO assert transformation equivalence connectivity[ielem][iedge] = jelem connectivity[jelem][jedge] = ielem return tuple(connectivity) @cache.property def boundary(self): elements = [elem.edge(iedge) for elem, ioppelems in zip(self, self.connectivity) for iedge in numpy.where(ioppelems == -1)[0]] return UnstructuredTopology(self.ndims-1, elements) @cache.property def interfaces(self): seen = set() elements = [] for ielem, ioppelems in enumerate(self.connectivity): elem = self.elements[ielem] for iedge, ioppelem in enumerate(ioppelems): if ioppelem == -1: continue try: seen.remove((ielem, iedge)) except KeyError: ioppedge = tuple(self.connectivity[ioppelem]).index(ielem) oppelem = self.elements[ioppelem] elements.append(elem.edge(iedge).withopposite(oppelem.edge(ioppedge), oriented=False)) seen.add((ioppelem, ioppedge)) assert not seen return UnstructuredTopology(self.ndims-1, elements)
[docs] def basis_bubble(self): 'bubble from vertices' assert self.ndims == 2 coeffs = numeric.const([[[1,-1, 0, 0], [-1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[0, 0, 0, 0], [ 1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[0, 1, 0, 0], [ 0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[0, 0, 0, 0], [ 0, 27,-27, 0], [0,-27, 0, 0], [0, 0, 0, 0]]]) nmap = [] dofmap = {} for ielem, elem in enumerate(self): assert isinstance(elem.reference, element.TriangleReference) assert elem.nverts == 3 dofs = numpy.empty(4, dtype=int) for i, v in enumerate(elem.vertices): dof = dofmap.get(v) if dof is None: dof = len(self) + len(dofmap) dofmap[v] = dof dofs[i] = dof dofs[3] = ielem nmap.append(numeric.const(dofs, copy=False)) ndofs = len(self)+len(dofmap) return function.polyfunc([coeffs]*len(self), nmap, ndofs, (elem.transform for elem in self), issorted=False)
def basis_spline(self, degree): assert degree == 1 return self.basis('std', degree)
[docs] def basis_discont(self, degree): 'discontinuous shape functions' assert numeric.isint(degree) and degree >= 0 coeffs = [] nmap = [] ndofs = 0 for elem in self: elemcoeffs = elem.reference.get_poly_coeffs('bernstein', degree=degree) coeffs.append(elemcoeffs) nmap.append(numeric.const(ndofs + numpy.arange(len(elemcoeffs)), copy=False)) ndofs += len(elemcoeffs) degrees = set(n-1 for c in coeffs for n in c.shape[1:]) return function.polyfunc(coeffs, nmap, ndofs, (elem.transform for elem in self), issorted=False)
def _basis_c0_structured(self, name, degree): 'C^0-continuous shape functions with lagrange stucture' assert numeric.isint(degree) and degree >= 0 if degree == 0: raise ValueError('Cannot build a C^0-continuous basis of degree 0. Use basis \'discont\' instead.') nlocaldofs = 0 elem_slices = [] coeffs = [] for elem in self: elem_coeffs = elem.reference.get_poly_coeffs(name, degree=degree) coeffs.append(elem_coeffs) elem_slices.append(slice(nlocaldofs, nlocaldofs+len(elem_coeffs))) nlocaldofs += len(elem_coeffs) dofmap = -numpy.ones([nlocaldofs], dtype=int) for ielem, elem in enumerate(self): # Loop over all neighbors of elem and merge dofs. for iedge, jelem in enumerate(self.connectivity[ielem]): if jelem < 0: continue jedge = self.connectivity[jelem].tolist().index(ielem) #if ielem < jelem or ielem == jelem and iedge < jedge: # continue idofs = elem.reference.get_edge_dofs(degree, iedge) verts = tuple(elem.edge(iedge).vertices) neighbor = self.elements[jelem] neighbor_verts = tuple(neighbor.edge(jedge).vertices) jdofs = neighbor.reference.get_edge_dofs(degree, jedge) #jdofs = jdofs[neighbor.reference.edges[jedge][1].get_dof_transpose_map(degree, tuple(map(neighbor_verts.index, verts)))] foo = neighbor.reference.edges[jedge][1].get_dof_transpose_map(degree, tuple(map(neighbor_verts.index, verts))) for idof, j in zip(idofs, foo): ridof = idof = elem_slices[ielem].start+idof # Resolve idof. while dofmap[ridof] >= 0: ridof = dofmap[ridof] # Resolve jdof. rjdof = jdof = elem_slices[jelem].start+jdofs[j] while dofmap[rjdof] >= 0: rjdof = dofmap[rjdof] if ridof != rjdof: dofmap[max(ridof,rjdof)] = min(ridof,rjdof) if ridof != idof: dofmap[idof] = min(ridof,rjdof) # Assign dof numbers. ndofs = 0 for i in range(len(dofmap)): if dofmap[i] < 0: dofmap[i] = ndofs ndofs += 1 else: dofmap[i] = dofmap[dofmap[i]] dofs = tuple(numeric.const(dofmap[s]) for s in elem_slices) return function.polyfunc(coeffs, dofs, ndofs, (elem.transform for elem in self), issorted=False)
[docs] def basis_lagrange(self, degree): 'lagrange shape functions' return self._basis_c0_structured('lagrange', degree)
[docs] def basis_bernstein(self, degree): 'bernstein shape functions' return self._basis_c0_structured('bernstein', degree)
basis_std = basis_bernstein
[docs]class UnionTopology(Topology): 'grouped topology' def __init__(self, topos, names=()): self._topos = tuple(topos) assert all(isinstance(topo, Topology) for topo in self._topos) self._names = tuple(names)[:len(self._topos)] assert all(isinstance(name,str) for name in self._names) assert len(set(self._names)) == len(self._names), 'duplicate name' ndims = self._topos[0].ndims assert all(topo.ndims == ndims for topo in self._topos) super().__init__(ndims) def getitem(self, item): topos = [topo if name == item else topo.getitem(item) for topo, name in itertools.zip_longest(self._topos, self._names)] return functools.reduce(operator.or_, topos, EmptyTopology(self.ndims)) def __or__(self, other): if not isinstance(other, UnionTopology): return UnionTopology(self._topos + (other,), self._names) return UnionTopology(self._topos[:len(self._names)] + other._topos + self._topos[len(self._names):], self._names + other._names) @cache.property def elements(self): elements = [] for trans, elems in util.gather((elem.transform, elem) for topo in self._topos for elem in topo): if len(elems) == 1: elements.append(elems[0]) else: refs = [elem.reference for elem in elems] while len(refs) > 1: # sweep all possible unions until a single reference is left nrefs = len(refs) iref = 0 while iref < len(refs)-1: for jref in range(iref+1, len(refs)): try: unionref = refs[iref] | refs[jref] except TypeError: pass else: refs[iref] = unionref del refs[jref] break iref += 1 assert len(refs) < nrefs, 'incompatible elements in union' unionref, = refs opposite = elems[0].opposite assert all(elem.opposite == opposite for elem in elems[1:]) elements.append(element.Element(unionref, trans, opposite, oriented=True)) return elements @property def refined(self): return UnionTopology([topo.refined for topo in self._topos], self._names)
[docs]class SubsetTopology(Topology): 'trimmed' def __init__(self, basetopo, refs, newboundary=None): if newboundary is not None: assert isinstance(newboundary, str) or isinstance(newboundary, Topology) and newboundary.ndims == basetopo.ndims-1 assert len(refs) == len(basetopo) self.refs = tuple(refs) self.basetopo = basetopo self.newboundary = newboundary super().__init__(basetopo.ndims) def getitem(self, item): return self.basetopo.getitem(item).subset(self.elements, strict=False) def __rsub__(self, other): if self.basetopo == other: refs = [elem.reference - ref for elem, ref in zip(self.basetopo, self.refs)] return SubsetTopology(self.basetopo, refs, ~self.newboundary if isinstance(self.newboundary,Topology) else self.newboundary) return super().__rsub__(other) def __or__(self, other): if not isinstance(other, SubsetTopology) or self.basetopo != other.basetopo: return super().__or__(other) refs = [ref1 | ref2 for ref1, ref2 in zip(self.refs, other.refs)] if all(elem.reference == ref for elem, ref in zip(self.basetopo, refs)): return self.basetopo return SubsetTopology(self.basetopo, refs) # TODO boundary @cache.property def connectivity(self): mask = numpy.array([bool(ref) for ref in self.refs] + [False]) # trailing false serves to map -1 to -1 renumber = numpy.cumsum(mask)-1 renumber[~mask] = -1 connectivity = tuple(tuple(renumber[ioppelems[iedge]] if edgeref and iedge < len(ioppelems) else -1 for iedge, edgeref in enumerate(ref.edge_refs)) for ref, ioppelems in zip(self.refs, self.basetopo.connectivity) if ref) return connectivity @cache.property def elements(self): return tuple(element.Element(ref, elem.transform, elem.opposite) for elem, ref in zip(self.basetopo, self.refs) if ref) @property def refined(self): elems = [child for elem in self for child in elem.children if child] return self.basetopo.refined.subset(elems, self.newboundary.refined if isinstance(self.newboundary,Topology) else self.newboundary, strict=True) @cache.property @log.title def boundary(self): brefs = [element.EmptyReference(self.ndims-1)] * len(self.basetopo.boundary) # subset of original boundary newbelems = [] # newly formed boundary elements connectivity = self.basetopo.connectivity elements = self.basetopo.elements baseboundary = self.basetopo.boundary for ielem, ioppelems in enumerate(connectivity): ref = self.refs[ielem] if not ref: continue elem = elements[ielem] newbelems.extend(element.Element(edge, elem.transform + (etrans,), elem.transform + (etrans.flipped,)) for etrans, edge in ref.edges[elem.reference.nedges:]) for iedge, ioppelem in enumerate(ioppelems): bref = ref.edge_refs[iedge] if not bref: continue if ioppelem == -1: index = baseboundary.edict[transform.canonical(elem.transform + (ref.edge_transforms[iedge],))] brefs[index] = bref # by construction, bref must be equal or subset of original else: ioppedge = tuple(connectivity[ioppelem]).index(ielem) oppref = self.refs[ioppelem] if oppref: bref -= oppref.edge_refs[ioppedge] if bref: newbelems.append(element.Element(bref, elem.transform + (ref.edge_transforms[iedge],), elements[ioppelem].edge(ioppedge).transform)) origboundary = SubsetTopology(baseboundary, brefs) trimboundary = OrientedGroupsTopology(self.newboundary if isinstance(self.newboundary,Topology) else self.basetopo.interfaces, newbelems) return UnionTopology([trimboundary, origboundary], names=[self.newboundary] if isinstance(self.newboundary,str) else []) @cache.property @log.title def interfaces(self): irefs = [element.EmptyReference(self.ndims-1)] * len(self.basetopo.interfaces) # subset of original interfaces connectivity = self.basetopo.connectivity elements = self.basetopo.elements baseinterfaces = self.basetopo.interfaces for ielem, ioppelems in enumerate(connectivity): ref = self.refs[ielem] if not ref: continue # edge is empty elem = elements[ielem] for iedge, ioppelem in enumerate(ioppelems): if ioppelem == -1: continue # edge lies on the boundary oppref = self.refs[ioppelem] if not oppref: continue # edge is empty index = baseinterfaces.edict.get(transform.canonical(elem.transform + (ref.edge_transforms[iedge],))) if index is None: continue # edge is not oriented with an interface; rely on connectivity to also yield flipped element ioppedge = tuple(connectivity[ioppelem]).index(ielem) irefs[index] = ref.edge_refs[iedge] & oppref.edge_refs[ioppedge] return SubsetTopology(baseinterfaces, irefs) @log.title def basis(self, name, *args, **kwargs): if isinstance(self.basetopo, HierarchicalTopology): warnings.warn('basis may be linearly dependent; a linearly indepent basis is obtained by trimming first, then creating hierarchical refinements') basis = self.basetopo.basis(name, *args, **kwargs) return self.prune_basis(basis)
[docs]class OrientedGroupsTopology(UnstructuredTopology): 'unstructured topology with undirected semi-overlapping basetopology' def __init__(self, basetopo, elems): self.basetopo = basetopo super().__init__(basetopo.ndims, elems) def getitem(self, item): elements = [] for elem in self.basetopo.getitem(item): try: ielem, tail = transform.lookup_item(elem.transform, self.edict) except KeyError: elem = elem.flipped try: ielem, tail = transform.lookup_item(elem.transform, self.edict) except KeyError: continue if tail: raise NotImplementedError ref = self.elements[ielem].reference & elem.reference elements.append(element.Element(ref, elem.transform, elem.opposite, oriented=True)) return UnstructuredTopology(self.ndims, elements)
[docs]class RefinedTopology(Topology): 'refinement' def __init__(self, basetopo): self.basetopo = basetopo super().__init__(basetopo.ndims) def getitem(self, item): return self.basetopo.getitem(item).refined @cache.property def elements(self): return tuple([child for elem in self.basetopo for child in elem.children]) @cache.property def boundary(self): return self.basetopo.boundary.refined
[docs]class TrimmedTopologyItem(Topology): 'trimmed topology item' def __init__(self, basetopo, refdict): self.basetopo = basetopo self.refdict = refdict super().__init__(basetopo.ndims) @cache.property def elements(self): elements = [] for elem in self.basetopo: ref = elem.reference & self.refdict[elem.transform] if ref: elements.append(element.Element(ref, elem.transform, elem.opposite, oriented=True)) return elements
[docs]class TrimmedTopologyBoundaryItem(Topology): 'trimmed topology boundary item' def __init__(self, btopo, trimmed, othertopo): self.btopo = btopo self.trimmed = trimmed self.othertopo = othertopo super().__init__(btopo.ndims) @cache.property def elements(self): belems = [elem for elem in self.trimmed if elem.opposite in self.btopo.edict] if self.othertopo: belems.extend(self.othertopo) return belems
[docs]class HierarchicalTopology(Topology): 'collection of nested topology elments' def __init__(self, basetopo, allelements, precise): 'constructor' assert not isinstance(basetopo, HierarchicalTopology) self.basetopo = basetopo self.allelements = tuple(allelements) if precise: self.elements = self.allelements super().__init__(basetopo.ndims) def getitem(self, item): return self.basetopo.getitem(item).hierarchical(self.allelements, precise=False) def hierarchical(self, elements, precise=False): return self.basetopo.hierarchical(elements, precise) @cache.property def elements(self): itemelems = [] for elem in self.allelements: try: ielem, tail = transform.lookup_item(elem.transform, self.basetopo.edict) except KeyError: continue itemelem = self.basetopo.elements[ielem] ref = itemelem.reference for trans in tail: index = ref.child_transforms.index(trans) ref = ref.child_refs[index] if not ref: break else: ref &= elem.reference if ref: itemelems.append(element.Element(ref, elem.transform, elem.opposite, oriented=True)) return itemelems @cache.property @log.title def levels(self): levels = [self.basetopo] for elem in self: try: ielem, tail = transform.lookup_item(elem.transform, self.basetopo.edict) except KeyError: raise Exception('element is not a refinement of basetopo') else: nrefine = len(tail) while nrefine >= len(levels): levels.append(levels[-1].refined) assert elem.transform in levels[nrefine].edict, 'element is not a refinement of basetopo' return tuple(levels) @cache.property def refined(self): elements = [child for elem in self for child in elem.children] return self.basetopo.hierarchical(elements, precise=True) @cache.property @log.title def boundary(self): 'boundary elements' basebtopo = self.basetopo.boundary edgepool = [edge for elem in self if transform.lookup(elem.transform, self.basetopo.border_transforms) for edge in elem.edges if edge is not None] belems = [] for edge in edgepool: # superset of boundary elements try: iedge, tail = transform.lookup_item(edge.transform, basebtopo.edict) except KeyError: pass else: opptrans = basebtopo.elements[iedge].opposite + tail belems.append(element.Element(edge.reference, edge.transform, opptrans, oriented=True)) return basebtopo.hierarchical(belems, precise=True) @cache.property @log.title def interfaces(self): 'interfaces' # Build a lookup table for level and element indices given elements in this # topology. elem_index_level = { elem: (ielem, ilevel) for ilevel, level in enumerate(self.levels) for ielem, elem in enumerate(level) } oriented = isinstance(self.basetopo, StructuredTopology) edict = self.edict interfaces = [] for elem in log.iter('elem', self): # Get `level`, element number at `level` of `elem`. ielem, ilevel = elem_index_level[elem] level = self.levels[ilevel] # Loop over neighbours of `elem`. for ielemedge, ineighbor in enumerate(level.connectivity[ielem]): if ineighbor < 0: # Not an interface. continue neighbor = level.elements[ineighbor] # Lookup `neighbor` (from the same `level` as `elem`) in this topology. head, tail = transform.lookup(neighbor.transform, edict) or (None, None) if not head: # `neighbor` not found, hence refinements of `neighbor` are present. # The interface of this edge will be added when we encounter the # refined elements. continue # Find the edge of `neighbor` between `neighbor` and `elem`. ineighboredge = tuple(level.connectivity[ineighbor]).index(ielem) if not tail and (ielem, ielemedge) > (ineighbor, ineighboredge): # `neighbor` itself, not a parent of, exists in this topology (`tail` # is empty). To make sure we add this interface only once we # continue here if the current element has a higher index (in # `level`) than the neighbor (or a higher edge number if the elements # are equal, which might occur when there is only one element in a # periodic dimension). continue # Create and add the interface between `elem` and `neighbor`. elemedge = elem.edges[ielemedge] neighboredge = neighbor.edges[ineighboredge] interfaces.append(element.Element(elemedge.reference, elemedge.transform, neighboredge.transform, oriented=oriented)) return UnstructuredTopology(self.ndims-1, interfaces)
[docs] @log.title def basis(self, name, *args, **kwargs): 'build hierarchical function space' # The law: a basis function is retained if all elements of self can # evaluate it through cascade, and at least one element of self can # evaluate it directly. # Procedure: per refinement level, track which basis functions have at # least one supporting element coinsiding with self ('touched') and no # supporting element finer than self ('supported'). dofs_coeffs = [] renumber = [] supports = [] length = 0 for topo in log.iter('level', self.levels): basis = topo.basis(name, *args, **kwargs) # shape functions for current level supported = numpy.ones(len(basis), dtype=bool) # True if dof is fully contained in self or parents touchtopo = numpy.zeros(len(basis), dtype=bool) # True if dof touches at least one elem in self (axes,func), = function.blocks(basis) dofmap, = axes if isinstance(func, function.Polyval): coeffs = func.coeffs assert coeffs.ndim == 1+self.ndims elif func.isconstant: assert func.ndim == 1 coeffs = func[(slice(None),*(_,)*self.ndims)] else: raise ValueError for elem in topo: trans = elem.transform idofs, = dofmap.eval(_transforms=(elem.transform, elem.opposite)) if trans in self.edict: touchtopo[idofs] = True elif transform.lookup(trans, self.edict): supported[idofs] = False support = supported & touchtopo supports.append(support) cumsum_support = numpy.cumsum(support) renumber.append(cumsum_support+(length-1)) length += cumsum_support[-1] dofs_coeffs.append(function.Tuple((dofmap, coeffs))) dofs = [] coeffs = [] transforms = tuple(sorted(elem.transform for elem in self)) for trans in transforms: hcoeffs = [] hdofs = [] ibase, tail = transform.lookup_item(trans, self.basetopo.edict) for ilevel in range(len(tail)+1): (idofs,), (icoeffs,) = dofs_coeffs[ilevel].eval(_transforms=(trans,)) isupport = supports[ilevel][idofs] if not isupport.any(): continue hdofs.extend(map(renumber[ilevel].__getitem__, idofs[isupport])) hcoeffs.extend(transform.transform_poly(tail[ilevel:], icoeffs[isupport])) dofs.append(numeric.const(hdofs)) coeffs.append(numeric.poly_stack(hcoeffs)) return function.polyfunc(coeffs, dofs, length, transforms, issorted=True)
[docs]class ProductTopology(Topology): 'product topology' def __init__(self, topo1, topo2): self.topo1 = topo1 self.topo2 = topo2 super().__init__(topo1.ndims+topo2.ndims) def __len__(self): return len(self.topo1) * len(self.topo2) @cache.property def structure(self): return self.topo1.structure[(...,)+(_,)*self.topo2.ndims] * self.topo2.structure @cache.property def elements(self): return (numpy.array(self.topo1.elements, dtype=object)[:,_] * numpy.array(self.topo2.elements, dtype=object)[_,:]).ravel() def __iter__(self): return self.elements.flat @property def refined(self): return self.topo1.refined * self.topo2.refined def refine(self, n): if numpy.iterable(n): assert len(n) == self.ndims else: n = (n,)*self.ndims return self.topo1.refine(n[:self.topo1.ndims]) * self.topo2.refine(n[self.topo1.ndims:]) def getitem(self, item): return self.topo1.getitem(item) * self.topo2 | self.topo1 * self.topo2.getitem(item) if isinstance(item, str) \ else self.topo1[item[:self.topo1.ndims]] * self.topo2[item[self.topo1.ndims:]] def basis(self, name, *args, **kwargs): def _split(arg): if isinstance(arg, (list,tuple)): assert len(arg) == self.ndims return arg[:self.topo1.ndims], arg[self.topo1.ndims:] else: return arg, arg splitargs = [_split(arg) for arg in args] splitkwargs = [(name,)+_split(arg) for name, arg in kwargs.items()] basis1, basis2 = function.bifurcate( self.topo1.basis(name, *[arg1 for arg1, arg2 in splitargs], **{name: arg1 for name, arg1, arg2 in splitkwargs}), self.topo2.basis(name, *[arg2 for arg1, arg2 in splitargs], **{name: arg2 for name, arg1, arg2 in splitkwargs})) return function.ravel(function.outer(basis1,basis2), axis=0) @cache.property def boundary(self): return self.topo1 * self.topo2.boundary + self.topo1.boundary * self.topo2 @cache.property def interfaces(self): return self.topo1 * self.topo2.interfaces + self.topo1.interfaces * self.topo2
[docs]class RevolutionTopology(Topology): 'topology consisting of a single revolution element' def __init__(self): self.elements = element.Element(element.RevolutionReference(), [transform.RootTrans('angle',(1,))]), self.boundary = EmptyTopology(ndims=0) super().__init__(ndims=1) def basis(self, name, *args, **kwargs): return function.asarray([1])
[docs]class MultipatchTopology(Topology): 'multipatch topology' Patch = collections.namedtuple('Patch', ['topo', 'verts', 'boundaries']) Patch.__qualname__ = 'MultipatchTopology.Patch'
[docs] @staticmethod def build_boundarydata(connectivity): 'build boundary data based on connectivity' boundarydata = [] for patch in connectivity: ndims = len(patch.shape) patchboundarydata = [] for dim, side in itertools.product(range(ndims), [-1, 0]): # ignore vertices at opposite face verts = numpy.array(patch) opposite = tuple({0:-1, -1:0}[side] if i == dim else slice(None) for i in range(ndims)) verts[opposite] = verts.max()+1 if len(set(verts.flat)) != 2**(ndims-1)+1: raise NotImplementedError('Cannot compute canonical boundary if vertices are used more than once.') # reverse axes such that lowest vertex index is at first position reverse = tuple(slice(None, None, -1) if i else slice(None) for i in numpy.unravel_index(verts.argmin(), verts.shape)) verts = verts[reverse] # transpose such that second lowest vertex connects to lowest vertex in first dimension, third in second dimension, et cetera k = [verts[tuple(1 if i == j else 0 for j in range(ndims))] for i in range(ndims)] transpose = tuple(sorted(range(ndims), key=k.__getitem__)) verts = verts.transpose(transpose) # boundarid boundaryid = tuple(verts[...,0].flat) patchboundarydata.append((boundaryid,dim,side,reverse,transpose)) boundarydata.append(tuple(patchboundarydata)) # TODO: boundary sanity checks return boundarydata
def __init__(self, patches): 'constructor' self.patches = tuple(self.Patch(*patch) for patch in patches) self._patchinterfaces = {} for patch in self.patches: for boundaryid, dim, side, reverse, transpose in patch.boundaries: self._patchinterfaces.setdefault(boundaryid, []).append((patch.topo, dim, side, reverse, transpose)) self._patchinterfaces = { boundaryid: tuple(data) for boundaryid, data in self._patchinterfaces.items() if len(data) > 1 } super().__init__(self.patches[0].topo.ndims) @cache.property def elements(self): return tuple(itertools.chain.from_iterable(patch.topo for patch in self.patches)) def getitem(self, key): for i in range(len(self.patches)): if key == 'patch{}'.format(i): return self.patches[i].topo else: return UnionTopology(patch.topo.getitem(key) for patch in self.patches)
[docs] def basis_spline(self, degree, patchcontinuous=True, knotvalues=None, knotmultiplicities=None): '''spline from vertices Create a spline basis with degree ``degree`` per patch. If ``patchcontinuous``` is true the basis is $C^0$-continuous at patch interfaces. ''' if knotvalues is None: knotvalues = {None: None} else: knotvalues, _knotvalues = {}, knotvalues for edge, k in _knotvalues.items(): if k is None: rk = None else: k = tuple(k) rk = k[::-1] if edge is None: knotvalues[edge] = k else: l, r = edge assert (l,r) not in knotvalues assert (r,l) not in knotvalues knotvalues[(l,r)] = k knotvalues[(r,l)] = rk if knotmultiplicities is None: knotmultiplicities = {None: None} else: knotmultiplicities, _knotmultiplicities = {}, knotmultiplicities for edge, k in _knotmultiplicities.items(): if k is None: rk = None else: k = tuple(k) rk = k[::-1] if edge is None: knotmultiplicities[edge] = k else: l, r = edge assert (l,r) not in knotmultiplicities assert (r,l) not in knotmultiplicities knotmultiplicities[(l,r)] = k knotmultiplicities[(r,l)] = rk missing = object() coeffs = [] dofmap = [] transforms = [] dofcount = 0 commonboundarydofs = {} for ipatch, patch in enumerate(self.patches): transforms.extend(elem.transform for elem in patch.topo) # build structured spline basis on patch `patch.topo` patchknotvalues = [] patchknotmultiplicities = [] for idim in range(self.ndims): left = tuple(0 if j == idim else slice(None) for j in range(self.ndims)) right = tuple(1 if j == idim else slice(None) for j in range(self.ndims)) dimknotvalues = set() dimknotmultiplicities = set() for edge in zip(patch.verts[left].flat, patch.verts[right].flat): v = knotvalues.get(edge, knotvalues.get(None, missing)) m = knotmultiplicities.get(edge, knotmultiplicities.get(None, missing)) if v is missing: raise 'missing edge' dimknotvalues.add(v) if m is missing: raise 'missing edge' dimknotmultiplicities.add(m) if len(dimknotvalues) != 1: raise 'ambiguous knot values for patch {}, dimension {}'.format(ipatch, idim) if len(dimknotmultiplicities) != 1: raise 'ambiguous knot multiplicities for patch {}, dimension {}'.format(ipatch, idim) patchknotvalues.append(next(iter(dimknotvalues))) patchknotmultiplicities.append(next(iter(dimknotmultiplicities))) patchcoeffs, patchdofmap, patchdofcount = patch.topo._basis_spline(degree, knotvalues=patchknotvalues, knotmultiplicities=patchknotmultiplicities) coeffs.extend(patchcoeffs) dofmap.extend(numeric.const(dofs+dofcount, copy=False) for dofs in patchdofmap) if patchcontinuous: # reconstruct multidimensional dof structure dofs = dofcount + numpy.arange(numpy.prod(patchdofcount), dtype=int).reshape(patchdofcount) for boundaryid, dim, side, reverse, transpose in patch.boundaries: # get patch boundary dofs and reorder to canonical form boundarydofs = dofs[reverse].transpose(transpose)[...,0].ravel() # append boundary dofs to list (in increasing order, automatic by outer loop and dof increment) commonboundarydofs.setdefault(boundaryid, []).append(boundarydofs) dofcount += numpy.prod(patchdofcount) if patchcontinuous: # build merge mapping: merge common boundary dofs (from low to high) pairs = itertools.chain(*(zip(*dofs) for dofs in commonboundarydofs.values() if len(dofs) > 1)) merge = {} for dofs in sorted(pairs): dst = merge.get(dofs[0], dofs[0]) for src in dofs[1:]: merge[src] = dst # build renumber mapping: renumber remaining dofs consecutively, starting at 0 remainder = set(merge.get(dof, dof) for dof in range(dofcount)) renumber = dict(zip(sorted(remainder), range(len(remainder)))) # apply mappings dofmap = tuple(numeric.const(tuple(renumber[merge.get(dof, dof)] for dof in v.flat), dtype=int).reshape(v.shape) for v in dofmap) dofcount = len(remainder) return function.polyfunc(coeffs, dofmap, dofcount, transforms, issorted=False)
[docs] def basis_discont(self, degree): 'discontinuous shape functions' bases = [patch.topo.basis('discont', degree=degree) for patch in self.patches] coeffs = [] dofs = [] ndofs = 0 for patch in self.patches: basis = patch.topo.basis('discont', degree=degree) (axes,func), = function.blocks(basis) patch_dofmap, = axes if isinstance(func, function.Polyval): patch_coeffs = func.coeffs assert patch_coeffs.ndim == 1+self.ndims elif func.isconstant: assert func.ndim == 1 patch_coeffs = func[(slice(None),*(_,)*self.ndims)] else: raise ValueError patch_coeffs_dofs = function.Tuple((patch_coeffs, patch_dofmap)) for elem in patch.topo: (elem_coeffs,), (elem_dofs,) = patch_coeffs_dofs.eval(_transforms=(elem.transform,)) coeffs.append(elem_coeffs) dofs.append(numeric.const(ndofs+elem_dofs, copy=False)) ndofs += len(basis) return function.polyfunc(coeffs, dofs, ndofs, (elem.transform for patch in self.patches for elem in patch.topo), issorted=False)
[docs] def basis_patch(self): 'degree zero patchwise discontinuous basis' npatches = len(self.patches) coeffs = [numeric.const(1, dtype=int).reshape(1, *(1,)*self.ndims)]*npatches dofs = numeric.const(range(npatches), dtype=int)[:,_] return function.polyfunc(coeffs, dofs, npatches, ((patch.topo.root,) for patch in self.patches), issorted=False)
@cache.property def boundary(self): 'boundary' subtopos = [] subnames = [] for i, patch in enumerate(self.patches): names = dict(zip(itertools.product(range(self.ndims), [0,-1]), patch.topo._bnames)) for boundaryid, dim, side, reverse, transpose in patch.boundaries: if boundaryid in self._patchinterfaces: continue subtopos.append(patch.topo.boundary[names[dim,side]]) subnames.append('patch{}-{}'.format(i, names[dim,side])) if len(subtopos) == 0: return EmptyTopology(self.ndims-1) else: return UnionTopology(subtopos, subnames) @cache.property def interfaces(self): '''interfaces Return a topology with all element interfaces. The patch interfaces are accessible via the group ``'interpatch'`` and the interfaces *inside* a patch via ``'intrapatch'``. ''' intrapatchtopo = EmptyTopology(self.ndims-1) if not self.patches else \ UnionTopology(patch.topo.interfaces for patch in self.patches) btopos = [] bconnectivity = [] for boundaryid, patchdata in self._patchinterfaces.items(): if len(patchdata) > 2: raise ValueError('Cannot create interfaces of multipatch topologies with more than two interface connections.') pairs = [] for topo, dim, side, reverse, transpose in patchdata: names = dict(zip(itertools.product(range(self.ndims), [0,-1]), topo._bnames)) # get structured set of boundary elements elems = topo.boundary[names[dim, side]].structure # add singleton axis elems = elems[tuple(_ if i == dim else slice(None) for i in range(self.ndims))] # apply canonical transformation elems = elems[reverse].transpose(transpose)[..., 0] shape = elems.shape pairs.append(elems.flat) # join element pairs elems = [ element.Element(elem_a.reference, elem_a.transform, elem_b.transform, oriented=True) for elem_a, elem_b in zip(*pairs) ] # create structured topology of joined element pairs bpatch = numpy.array(boundaryid).reshape((2,)*(self.ndims-1)) #btopos.append(StructuredTopology(numpy.array(elems).reshape(shape))) btopos.append(UnstructuredTopology(self.ndims-1, elems)) bconnectivity.append(bpatch) # create multipatch topology of interpatch boundaries interpatchtopo = MultipatchTopology(zip(btopos, bconnectivity, self.build_boundarydata(bconnectivity))) return UnionTopology((intrapatchtopo, interpatchtopo), ('intrapatch', 'interpatch')) @cache.property def refined(self): 'refine' return MultipatchTopology((patch.topo.refined, patch.verts, patch.boundaries) for patch in self.patches)
# UTILITY FUNCTIONS DimAxis = collections.namedtuple('DimAxis', ['i','j','isperiodic']) DimAxis.isdim = True BndAxis = collections.namedtuple('BndAxis', ['i','j','ibound','side']) BndAxis.isdim = False def common_refine(topo1, topo2): warnings.deprecation('common_refine(a, b) will be removed in future; use a & b instead') return topo1 & topo2 # vim:shiftwidth=2:softtabstop=2:expandtab:foldmethod=indent:foldnestmax=2