线程安全的可控制最大容量且带有过期时间的本地缓存
最近在公司优化一个接口的时候打算使用一个key-value结构的本地缓存。
需要实现的功能非常简单:
1、可以控制本地缓存的最大对象数量。
2、线程安全,防止发生OOM。
3、同时支持设置单个对象的过期时间。
面对这个需求,我的选择很多,有很多框架都做的非常好,但大多数框架对我来说都太重量级了,我希望一个简单高效的实现,所以我开发了一个简单的小工具,在这里可以分享下实现思路和开发当中遇到的问题以及解决办法。
首先是key-value的结构,我底层封装了一个Map来保存数据。然后要解决线程安全问题,所以我使用了
ConcurrentHashMap这个Map的实现,关于ConcurrentHashMap这个类网上有很多介绍,我在这里就不多说了。
接下来就是需要控制最多存储的对象数量,防止本地缓存太多对象(而且对象一直都被引用,还无法被GC)造成OOM,一开始我只是简单的使用比较size和最大值来判断是否还能添加对象,但是在后来的测试发现并发量非常高的时候会多存几倍的对象,为了保证性能我还不希望加锁或使用synchronized关键字,所以我选择了AtomicInteger这个原子类巧妙的处理添加和删除方法。这个问题的解决我会在代码里详细解释。
对于过期时间实现,我参考了Redis底层对于过期部分的实现,它分为主动和被动过期,前者更节约空间后者性能更好,为此我兼容了两者的优势,采取了主动+被动的方式,在查询时判断是否过期,如果过期,清除对象同时返回null(被动)。在添加元素时判断是否还有空间,如果有正常添加,如果没有触发全量过期,之后再判断是否有空间,有就添加,没有就返回添加失败(主动)。
具体代码如下
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 本地缓存
* 采用懒过期模式 在查询时才判断是否过期
* 在缓存满了的时候触发主动过期过期
*
* @author zhangmingxu ON 17:52 2019-05-20
**/
public class LocalCache {
private static final Logger logger = LoggerFactory.getLogger(LocalCache.class);
private static final int DEFAULT_MAX_NUMBER = 100; //默认最大缓存对象数
private final Map<String, Value> cache; //真正存储数据的Map,使用ConcurrentHashMap
private final int maxNumber; //最大对象数
//并发控制器,很重要,防止高并发下本地缓存对象个数超过maxNumber
private final AtomicInteger cur = new AtomicInteger(0);
/**
* 使用默认最大对象数100
*/
public LocalCache() {
this(DEFAULT_MAX_NUMBER);
}
public LocalCache(int maxNumber) {
this.maxNumber = maxNumber;
this.cache = new ConcurrentHashMap<>(maxNumber);
}
/**
* 添加
* 判断是否超过最大限制 如果超过触发一次全量过期
* 如果全量过期后还不够返回false
* 由于1 2 不是原子的所以需要使用单独的AtomicInteger来控制
*
* @param key 对应的key
* @param value 值
* @param expire 过期时间 单位毫秒
*/
public boolean put(String key, Object value, long expire) {
if (StringUtils.isBlank(key) || value == null || expire < 0) {
logger.error("本地缓存put参数异常");
return false;
}
if (!incr()) { //如果CAS增加失败直接返回添加失败
return false;
}
if (isOver()) { //判断是否需要过期
expireAll(); //触发一次全量过期
if (isOver()) { //二次检查
logger.error("本地缓存put时全量过期后还没有空间");
decr();
return false;
}
}
putValue(key, value, expire);
return true;
}
/**
* 获取时判断过期时间
* 在这里实现懒过期
*/
public Object get(String key) {
Value v = cache.get(key);
if (v == null) {
return null;
}
if (isExpired(v)) {
logger.info("本地缓存key={}已经过期", key);
removeValue(key);
return null;
}
return v.value;
}
/**
* 判断是否过期,实现很简单
*/
private boolean isExpired(Value v) {
long current = System.currentTimeMillis();
return current - v.updateTime > v.expire;
}
/**
* 扫描所有的对象对需要过期的过期
*/
private void expireAll() {
logger.info("开始过期本地缓存");
for (Map.Entry<String, Value> entry : cache.entrySet()) {
if (isExpired(entry.getValue())) {
removeValue(entry.getKey());
}
}
}
/**
* 为了保证cur和Map的size时刻保持一致这里我查询了put的注释及ConcurrentHashMap底层关于put的实现。
* 发现如果put方法返回的不是null说明存在覆盖操作,如果是覆盖那么Map的size其实没有变,因为我们添加之前把cur的值增加
* 上去了所以要在这里减下来。
*/
private void putValue(String key, Object value, long expire) {
Value v = new Value(System.currentTimeMillis(), expire, value);
if (cache.put(key, v) != null) {//存在覆盖 使得cur和map的size统一
decr();
}
}
/**
* 这里也是为了保证cur和Map的size时刻保持一致只有在remove方法返回的不是null时才证明真正有对象被删除了,才需要把
* cur减下来。这里出现remove返回为null是因为可能存在并发删除,两个线程删除同一个对象只能有一个删除成功(返回不是
* null),另一个(返回null)如果也减小了cur的值,会造成cur和Map的size不一致。
private void removeValue(String key) {
if (cache.remove(key) != null) { //真正删除成功了 使得cur和map的size统一
decr();
}
}
/**
* 这里很重要,原来我使用的是cache.size() >= maxNumber;
* 但是如果使用map本身的size方法会存在获取size和putValue方法不是原子的,
* 可能多个线程同时都判断那时候还没执行putValue方法,线程都认为还没有满,大家都执行了putValue方法造成数据太多
*/
private boolean isOver() {
return cur.get() > maxNumber;
}
private boolean incr() {
int c = cur.get();
return cur.compareAndSet(c, ++c);
}
/**
* 因为CAS不一定是一定成功的
* 所以这里通过循环保证成功
*/
private void decr() {
for (; ; ) {
int c = cur.get();
if (c == 0) {
logger.error("LocalCache decr cur is 0");
return;
}
if (cur.compareAndSet(c, --c)) {
return;
}
}
}
private static class Value {
private long updateTime; //更新时间
private long expire; //有效期
private Object value; //真正的对象
private Value(long updateTime, long expire, Object value) {
this.updateTime = updateTime;
this.expire = expire;
this.value = value;
}
}
}
这里面最关键的就是AtomicInteger cur这个对象,它在put方法参数校验通过之后就加1(虽然当时还没有putValue),使用这个操作让其他线程在后面的isOver方法中马上感知到数量变化,不会添加过多的对象。
保证cur的值和Map的Size时刻一致也很重要,并不是只要putValue了就加一(覆盖时虽然put进去了对象但是size不变),remove了就减一(并发删除同一个对象只能有一个成功,可能多减了),平常我们在使用Map的put和remove方法时往往忽略了它们的返回值,所以我建议大家仔细阅读源代码,加深理解。
并发测试代码如下:
public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
LocalCache localCache = new LocalCache();
int n = 500; //线程数
int m = 100000; //每个线程put个数
CountDownLatch count = new CountDownLatch(n);
for (int i = 0; i < n; i++) {
new Thread(() -> {
for (int j = 0; j < m; j++) {
localCache.put(j + "", new Object(), 10);
}
count.countDown();
}).start();
}
count.await();
System.out.println("size:" + localCache.cache.size());
System.out.println("cur:" + localCache.cur);
System.out.println("耗时 " + (System.currentTimeMillis() - start));
}
原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: 线程安全的可控制最大容量且带有过期时间的本地缓存
我之前也有这个需求,后来用的guava的缓存
我很喜欢这个,里面很多实用工具,我还做过一个基于Redis的BloomFilter,里面的很多算法就是从这个框架复制来的。哈哈哈
你考虑了很多我考虑不到的地方,值得学习
您好,看你写的很不错,我是搜狗的开发,正在招人。可以的话,可以加我qq 529227945,聊一下。
你好,最近没有换工作的打算。
抱歉
我随便列几个代码中存在的问题吧
1、put方法中的
if (!incr()) { //如果CAS增加失败直接返回添加失败
return false;
}
incr方法里的cas失败是并发时很正常的情况,直接将这种情况作失败处理,难道是说业务上不允许有并发出现?
2、put方法中的容量限定存在逻辑问题
假如当前容量为990,上限为1000,此时有20个线程同时调用put方法,同时执行完incr(),此时计数器为1100,isOver()将返回true,所以判定达到最大容量,然后同时执行到二次校验后的decr()处,计数器从1100减到990,20个线程全插入失败
结局是没有一个线程插入成功了,但此时实际容量却为990,还有10个空位,类似的问题就不一一列举了
3、性能问题
当插入数据失败时,却对计数器(竞争点)进行了一增一减两次更新,从逻辑上来讲,这是没必要的
过期清除(expireAll())采用整个map遍历,而且在每次插入失败就执行一次,简直丧心病狂
尽量不要去尝试发明轮子,能流行开来的开源工具,肯定是获得很多人的认可,没有太大问题的,而自己去发明一个轮子,则可能隐藏有各种问题,或意想不到的坑。
要高效且【正确】地实现你上面描述的需求,其实有一点小复杂了,而且你还尝试用非阻塞算法(非阻塞算法比以往的有锁算法更难以驾驭),我觉得这个时候,用别人的轮子才是正解。
之前我把这个博客发到其他论坛都没人回复我,搞得很无聊,在这里居然有这么多人评论,你还写了这么多,有时间我仔细看看。
还有一个问题,就是每一次因空间满了进行全量清除,如果只回收了少量的空间。然后再进行put操作,很快缓存又满了,然后触发全量清除,又是只回收少量的空间。这样,周而复始。导致全量清除的次数过于频繁。因此,我认为可以设计一个全量清除的最小间隔时间。
这个也可以后台起一个守护线程去清理 类似于Java的GC 业务线程不关注清理工作
扩展来看也可以把这个时间间隔变成可配置的 提供修改方法 可以随时修改
之前我把这个博客发到其他论坛都没人回复我,搞得很无聊,在这里居然有这么多人评论,你还写了这么多,有时间我仔细看看。
incr()不进行自旋,desr()需要自旋。请问你出于什么考虑?
还有,就是这个原子计数器,为什么不考虑当concurrenthashmap的put操作成功后,计数器值设为容器的元素个数。remove操作成功,计数器值也设成容器的元素个数。
这样就不需要对原子计数器进行自增与自减法。
另外,比如put()方法的实现,如果你不使用锁,无法满足并发操作的数据一致性。trytocatch的观点我基本认同。
第一点 incr不自旋是因为它失败了可以业务降级不再放到本地缓存 但是如果desr失败就会导致计数器和map的size不一样 计数器回比size越来越大
第二个问题因为我觉得incr必须发生在put之前 并且我觉得可能存在覆盖风险
put操作inc操作失败后立即返回false,并不会像你说的如果达到最大值会尝试触发清除操作