Python 内存缓存与生存时间

我有多个线程运行相同的进程,需要能够通知对方,有些东西不应该在接下来的 n 秒内工作,但如果他们这样做不是世界末日。

我的目标是能够向缓存传递一个字符串和一个 TTL,并能够以列表的形式获取缓存中的所有字符串。缓存可以保存在内存中,TTL 的时间不会超过20秒。

有人对如何实现这一点有什么建议吗?

97735 次浏览

You can use the expiringdict module:

The core of the library is ExpiringDict class which is an ordered dictionary with auto-expiring values for caching purposes.

In the description they do not talk about multithreading, so in order not to mess up, use a Lock.

Something like that ?

from time import time, sleep
import itertools
from threading import Thread, RLock
import signal




class CacheEntry():
def __init__(self, string, ttl=20):
self.string = string
self.expires_at = time() + ttl
self._expired = False


def expired(self):
if self._expired is False:
return (self.expires_at < time())
else:
return self._expired


class CacheList():
def __init__(self):
self.entries = []
self.lock = RLock()


def add_entry(self, string, ttl=20):
with self.lock:
self.entries.append(CacheEntry(string, ttl))


def read_entries(self):
with self.lock:
self.entries = list(itertools.dropwhile(lambda x:x.expired(), self.entries))
return self.entries


def read_entries(name, slp, cachelist):
while True:
print "{}: {}".format(name, ",".join(map(lambda x:x.string, cachelist.read_entries())))
sleep(slp)


def add_entries(name, ttl, cachelist):
s = 'A'
while True:
cachelist.add_entry(s, ttl)
print("Added ({}): {}".format(name, s))
sleep(1)
s += 'A'






if __name__ == "__main__":
signal.signal(signal.SIGINT, signal.SIG_DFL)


cl = CacheList()
print_threads = []
print_threads.append(Thread(None, read_entries, args=('t1', 1, cl)))
# print_threads.append(Thread(None, read_entries, args=('t2', 2, cl)))
# print_threads.append(Thread(None, read_entries, args=('t3', 3, cl)))


adder_thread = Thread(None, add_entries, args=('a1', 2, cl))
adder_thread.start()


for t in print_threads:
t.start()


for t in print_threads:
t.join()


adder_thread.join()

The OP is using python 2.7 but if you're using python 3, ExpiringDict mentioned in the accepted answer is currently, well, expired. The last commit to the github repo was June 17, 2017 and there is an open issue that it doesn't work with Python 3.5

As of September 1, 2020, there is a more recently maintained project cachetools.

pip install cachetools

from cachetools import TTLCache


cache = TTLCache(maxsize=10, ttl=360)
cache['apple'] = 'top dog'
...
>>> cache['apple']
'top dog'
... after 360 seconds...
>>> cache['apple']
KeyError exception raised

ttl is the time to live in seconds.

Regarding an expiring in-memory cache, for general purpose use, a common design pattern to typically do this is not via a dictionary, but via a function or method decorator. A cache dictionary is managed behind the scenes. As such, this answer somewhat complements the answer by User which uses a dictionary rather than a decorator.

The ttl_cache decorator in cachetools works a lot like functools.lru_cache, but with a time to live.

import cachetools.func


@cachetools.func.ttl_cache(maxsize=128, ttl=10 * 60)
def example_function(key):
return get_expensively_computed_value(key)




class ExampleClass:
EXP = 2


@classmethod
@cachetools.func.ttl_cache()
def example_classmethod(cls, i):
return i * cls.EXP


@staticmethod
@cachetools.func.ttl_cache()
def example_staticmethod(i):
return i * 3

In case you don't want to use any 3rd libraries, you can add one more parameter to your expensive function: ttl_hash=None. This new parameter is so-called "time sensitive hash", its the only purpose is to affect lru_cache.

For example:

from functools import lru_cache
import time




@lru_cache()
def my_expensive_function(a, b, ttl_hash=None):
del ttl_hash  # to emphasize we don't use it and to shut pylint up
return a + b  # horrible CPU load...




def get_ttl_hash(seconds=3600):
"""Return the same value withing `seconds` time period"""
return round(time.time() / seconds)




# somewhere in your code...
res = my_expensive_function(2, 2, ttl_hash=get_ttl_hash())
# cache will be updated once in an hour


I absolutely love the idea from @iutinvg, I just wanted to take it a little further; decouple it from having to know to pass the ttl into every function and just make it a decorator so you don't have to think about it. If you have django, py3, and don't feel like pip installing any dependencies, try this out.

import time
from django.utils.functional import lazy
from functools import lru_cache, partial, update_wrapper




def lru_cache_time(seconds, maxsize=None):
"""
Adds time aware caching to lru_cache
"""
def wrapper(func):
# Lazy function that makes sure the lru_cache() invalidate after X secs
ttl_hash = lazy(lambda: round(time.time() / seconds), int)()
        

@lru_cache(maxsize)
def time_aware(__ttl, *args, **kwargs):
"""
Main wrapper, note that the first argument ttl is not passed down.
This is because no function should bother to know this that
this is here.
"""
def wrapping(*args, **kwargs):
return func(*args, **kwargs)
return wrapping(*args, **kwargs)
return update_wrapper(partial(time_aware, ttl_hash), func)
return wrapper

Proving it works (with examples):

@lru_cache_time(seconds=10)
def meaning_of_life():
"""
This message should show up if you call help().
"""
print('this better only show up once!')
return 42




@lru_cache_time(seconds=10)
def multiply(a, b):
"""
This message should show up if you call help().
"""
print('this better only show up once!')
return a * b
    

# This is a test, prints a `.` for every second, there should be 10s
# between each "this better only show up once!" *2 because of the two functions.
for _ in range(20):
meaning_of_life()
multiply(50, 99991)
print('.')
time.sleep(1)

I know this is a little old, but for those who are interested in no third-party dependencies, this is a minor wrapper around the builtin functools.lru_cache (I noticed Javier's similar answer after writing this, but figured I post it anyway since this doesn't require Django):

import functools
import time




def time_cache(max_age, maxsize=128, typed=False):
"""Least-recently-used cache decorator with time-based cache invalidation.


Args:
max_age: Time to live for cached results (in seconds).
maxsize: Maximum cache size (see `functools.lru_cache`).
typed: Cache on distinct input types (see `functools.lru_cache`).
"""
def _decorator(fn):
@functools.lru_cache(maxsize=maxsize, typed=typed)
def _new(*args, __time_salt, **kwargs):
return fn(*args, **kwargs)


@functools.wraps(fn)
def _wrapped(*args, **kwargs):
return _new(*args, **kwargs, __time_salt=int(time.time() / max_age))


return _wrapped


return _decorator

And its usage:

@time_cache(10)
def expensive(a: int):
"""An expensive function."""
time.sleep(1 + a)




print("Starting...")
expensive(1)
print("Again...")
expensive(1)
print("Done")

NB this uses time.time and comes with all its caveats. You may want to use time.monotonic instead if available/appropriate.

You can also go for dictttl, which has MutableMapping, OrderedDict and defaultDict(list)

Initialize an ordinary dict with each key having a ttl of 30 seconds

data = {'a': 1, 'b': 2}
dict_ttl = DictTTL(30, data)

OrderedDict

data = {'a': 1, 'b': 2}
dict_ttl = OrderedDictTTL(30, data)

defaultDict(list)

dict_ttl = DefaultDictTTL(30)
data = {'a': [10, 20], 'b': [1, 2]}
[dict_ttl.append_values(k, v) for k, v in data.items()]

If you want to avoid third-party packages, you can add in a custom timed_lru_cache decorator, which builds upon the lru_cache decorator.

The below defaults to a 20-second lifetime and a max size of 128. Note that the entire cache expires after 20 seconds, not individual items.

from datetime import datetime, timedelta
from functools import lru_cache, wraps




def timed_lru_cache(seconds: int = 20, maxsize: int = 128):
def wrapper_cache(func):
func = lru_cache(maxsize=maxsize)(func)
func.lifetime = timedelta(seconds=seconds)
func.expiration = datetime.utcnow() + func.lifetime


@wraps(func)
def wrapped_func(*args, **kwargs):
if datetime.utcnow() >= func.expiration:
func.cache_clear()
func.expiration = datetime.utcnow() + func.lifetime


return func(*args, **kwargs)


return wrapped_func


return wrapper_cache

Then, just add @timed_lru_cache() above your function and you'll be good to go:

@timed_lru_cache()
def my_function():
# code goes here...

Yet Another Solution

How it works?

  1. The user function is cached using @functools.lru_cache with support for maxsize and typed parameters.
  2. The Result object records the function's return value and "death" time using time.monotonic() + ttl.
  3. The wrapper function checks the "death" time of the return value against time.monotonic() and if the current time exceeds the "death" time, then recalculates the return value with a new "death" time.

Show me the code:

from functools import lru_cache, wraps
from time import monotonic




def lru_cache_with_ttl(maxsize=128, typed=False, ttl=60):
"""Least-recently used cache with time-to-live (ttl) limit."""


class Result:
__slots__ = ('value', 'death')


def __init__(self, value, death):
self.value = value
self.death = death


def decorator(func):
@lru_cache(maxsize=maxsize, typed=typed)
def cached_func(*args, **kwargs):
value = func(*args, **kwargs)
death = monotonic() + ttl
return Result(value, death)


@wraps(func)
def wrapper(*args, **kwargs):
result = cached_func(*args, **kwargs)
if result.death < monotonic():
result.value = func(*args, **kwargs)
result.death = monotonic() + ttl
return result.value


wrapper.cache_clear = cached_func.cache_clear
return wrapper


return decorator

How to use it?

# Recalculate cached results after 5 seconds.
@lru_cache_with_ttl(ttl=5)
def expensive_function(a, b):
return a + b

Benefits

  1. Short, easy to review, and no PyPI install necessary. Relies only on the Python standard library, 3.7+.
  2. No annoying ttl=10 parameter needed at all callsites.
  3. Does not evict all items at the same time.
  4. Key/value pairs actually live for the given TTL value.
  5. Stores only one key/value pair per unique (*args, **kwargs) even when items expire.
  6. Works as a decorator (kudos to the Javier Buzzi answer and Lewis Belcher answer).
  7. Is thread safe.
  8. Benefits from the C-optimizations of CPython from python.org and is compatible with PyPy.

The accepted answer fails #2, #3, #4, #5, and #6.

Drawbacks

Does not proactively evict expired items. Expired items are evicted only when the cache reaches maximum size. If the cache will not reach the maximum size (say maxsize is None), then no evictions will ever occur.

However, only one key/value pair is stored in the cache per unique (*args, **kwargs) given to the cached function. So if there are only 10 different parameter combinations, then the cache will only ever have 10 entries at max.

Note that the "time sensitive hash" and "time salt" solutions are much worse because multiple key/value cache items with identical keys (but different time hashes/salts) are left in the cache.

I really liked @iutinvg solution due to its simplicity. However, I don't want to put an extra argument into every function, that I need to cache. So inspired by Lewis and Javiers answer, I thought a decorator would be best. However, I did not want to use 3rd party libraries (as Javier) and I thought I could improve upon Lewis solution. So this is what I came up with.

import time
from functools import lru_cache




def ttl_lru_cache(seconds_to_live: int, maxsize: int = 128):
"""
Time aware lru caching
"""
def wrapper(func):


@lru_cache(maxsize)
def inner(__ttl, *args, **kwargs):
# Note that __ttl is not passed down to func,
# as it's only used to trigger cache miss after some time
return func(*args, **kwargs)
return lambda *args, **kwargs: inner(time.time() // seconds_to_live, *args, **kwargs)
return wrapper

My solution use a lambda to get fewer lines of code and integer floor division (//) so no casting to int is required.

Usage

@ttl_lru_cache(seconds_to_live=10)
def expensive(a: int):
"""An expensive function."""
time.sleep(1 + a)




print("Starting...")
expensive(1)
print("Again...")
expensive(1)
print("Done")

Note: With these decorators, you should never set maxsize=None, because the cache would then grow to infinity over time.

Somebody took some work to put it into a python package, see https://github.com/vpaliy/lru-expiring-cache.

Well, I've been mislead by the other answers (which don't really address the question), so this might not be the best tool. Still,

from lru import LruCache


cache = LruCache(maxsize=10, concurrent=True)


def producer(key: str, value = True, TTL = 20):
cache.add(key = key, value = value, expires = TTL)


def consumer():
remaining_items = cache.items()
# Alternatively, iterate over available items until you find one not in the cache
return remaining_items




producer("1", TTL = 1)
producer("5", TTL = 3)
print(consumer())  ## 1, 5


time.sleep(2)
print(consumer())  ## 5


time.sleep(2)
print(consumer())  ## nothing

To my surprise, it keeps a ('Concurrent', 'True') entry when running in concurrent mode.