Source code for nutils.cache

# Copyright (c) 2014 Evalf
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

"""
The cache module.
"""

from . import config, log, numeric, util
import os, sys, numpy, functools, inspect, builtins

def property(f):
  _self = object()
  _temp = object()
  _name = f.__name__
  def property_getter(self):
    try:
      dictvalue = self.__dict__[_name]
    except KeyError:
      self.__dict__[_name] = _temp # placeholder for detection of cyclic dependencies
      value = f(self)
      self.__dict__[_name] = value if value is not self else _self
    else:
      assert dictvalue is not _temp, 'attribute {!r} requested during construction'.format(_name)
      value = dictvalue if dictvalue is not _self else self
    return value
  def property_setter(self, value):
    assert _name not in self.__dict__, 'attempting to set attribute {!r} twice'.format(_name)
    self.__dict__[_name] = value if value is not self else _self
  return builtins.property(fget=property_getter, fset=property_setter)

[docs]class Wrapper: 'function decorator that caches results by arguments' def __init__(self, func): self.func = func self.cache = {} self.count = 0 self.signature = inspect.signature(func) def __call__(self, *args, **kwargs): self.count += 1 bound = self.signature.bind(*args, **kwargs) bound.apply_defaults() args = bound.args assert not bound.kwargs try: value = self.cache[args] except KeyError: value = self.func(*args) self.cache[args] = value return value @builtins.property def hits(self): return self.count - len(self.cache)
[docs]class WrapperCache: 'maintains a cache for Wrapper instances' def __init__(self): self.cache = {} def __getitem__(self, func): try: wrapper = self.cache[func] except KeyError: wrapper = Wrapper(func) self.cache[func] = wrapper return wrapper @builtins.property def stats(self): hits = count = 0 for wrapper in self.cache.values(): hits += wrapper.hits count += wrapper.count return 'not used' if not count \ else 'effectivity {}% (hit {}/{} calls over {} functions)'.format(100*hits/count, hits, count, len(self.cache))
[docs]class WrapperDummyCache: 'placeholder object' stats = 'caching disabled' def __getitem__(self, func): return func
class ImmutableMeta(type): _cleanup_threshold = 1000 # number of new instances until next cleanup def __init__(cls, *args, **kwargs): super().__init__(*args, **kwargs) signature = inspect.signature(cls.__init__) param0, *params = signature.parameters.values() cls._signature = inspect.Signature(params) cls._annotations = [(param.name, param.annotation) for param in params if param.annotation != param.empty] cls._cache = {} cls._init = cls.__init__ if cls._annotations: cls.__init__ = util.enforcetypes(cls.__init__, signature) def __call__(cls, *args, **kwargs): bound = cls._signature.bind(*args, **kwargs) bound.apply_defaults() for name, op in cls._annotations: bound.arguments[name] = op(bound.arguments[name]) assert not bound.kwargs return cls._new(*bound.args) def _new(cls, *args): try: self = cls._cache[args] except KeyError: self = cls.__new__(cls) self._args = args self._hash = hash(args) self._init(*args) cls._cache[args] = self if len(cls._cache) > cls._cleanup_threshold: cls._cache = {key: value for key, value in cls._cache.items() if sys.getrefcount(value) > 4} cls._cleanup_threshold = ImmutableMeta._cleanup_threshold + len(cls._cache) return self class Immutable(metaclass=ImmutableMeta): def __init__(self): pass def __reduce__(self): return self.__class__._new, self._args def __hash__(self): return self._hash def __lt__(self, other): return self is not other and (self.__class__.__name__,)+self._args < (other.__class__.__name__,)+other._args def __gt__(self, other): return self is not other and (self.__class__.__name__,)+self._args > (other.__class__.__name__,)+other._args def __le__(self, other): return self is other or (self.__class__.__name__,)+self._args < (other.__class__.__name__,)+other._args def __ge__(self, other): return self is other or (self.__class__.__name__,)+self._args > (other.__class__.__name__,)+other._args def __getstate__(self): raise Exception('getstate should never be called') def __setstate__(self, state): raise Exception('setstate should never be called') def __str__(self): return '{}({})'.format(self.__class__.__name__, ','.join(str(arg) for arg in self._args)) def edit(self, op): return self.__class__(*[op(arg) for arg in self._args])
[docs]class FileCache: 'cache' def __init__(self, *args): 'constructor' import os, numpy, hashlib, pickle serial = pickle.dumps(args, -1) self.myhash = hash(serial) hexhash = hashlib.md5(serial).hexdigest() cachedir = config.cachedir if not os.path.exists(cachedir): os.makedirs(cachedir) path = os.path.join(cachedir, hexhash) if not os.path.isfile(path) or config.recache: log.info('starting new cache:', hexhash) data = open(path, 'wb+') data.write(serial) data.flush() else: log.info('continuing from cache:', hexhash) data = open(path, 'ab+') data.seek(0) recovered_args = pickle.load(data) assert recovered_args == args, 'hash clash' self.data = data def __call__(self, func, *args, **kwargs): 'call' try: import cPickle as pickle except ImportError: import pickle name = func.__name__ + ''.join(' {}'.format(arg) for arg in args) + ''.join(' {}={}'.format(*item) for item in kwargs.items()) pos = self.data.tell() try: data = pickle.load(self.data) except EOFError: data = func(*args, **kwargs) self.data.seek(pos) pickle.dump(data, self.data, -1) self.data.flush() msg = 'written to' else: msg = 'loaded from' log.info(msg, 'cache:', name, '[{}b]'.format(self.data.tell()-pos)) return data def truncate(self): log.info('truncating cache') self.data.truncate() def __hash__(self): return self.myhash
[docs]def replace(func): '''decorator for deep object replacement Generates a deep replacement method for Immutable objects based on a callable that is applied (recursively) on individual constructor arguments. Args ---- func : callable which maps (obj, ...) onto replaced_obj Returns ------- callable The method that searches the object to perform the replacements. ''' @functools.wraps(func) def wrapped(target, *funcargs, **funckwargs): cache = {} def op(obj): try: replaced = cache[obj] except TypeError: # unhashable replaced = obj except KeyError: replaced = func(obj, *funcargs, **funckwargs) if replaced is None: replaced = obj.edit(op) if isinstance(obj, Immutable) else obj cache[obj] = replaced return replaced retval = op(target) del op return retval return wrapped
# vim:shiftwidth=2:softtabstop=2:expandtab:foldmethod=indent:foldnestmax=2