跳到主要内容

Redis缓存使用

Redis引入

导入redis

<!-- 引入redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

配置:

spring:
redis:
host: localhost
port: 6379

测试:

@Autowired
StringRedisTemplate stringRedisTemplate;

@Test
public void testStringRedisTemplate(){
ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();
ops.set("hello", "world"+ UUID.randomUUID().toString());
String hello = ops.get("hello");
System.out.println("redxis中保存的值是:"+hello);
}

修改获取三级分类菜单,使用redis做缓存:

@Override
public Map<String, List<Catelog2Vo>> getCatalogJsonFromRedis() {
//1.加入缓存 给缓存中放json字符串,方便以后使用
String catalogJson = stringRedisTemplate.opsForValue().get("catalogJson");
if (StringUtils.isEmpty(catalogJson)) {
//缓存中没有数据,查询数据库
Map<String, List<Catelog2Vo>> catalogJsonFromDb = getCatalogJsonFromDb();
//转为json放入缓存中
String jsonString = JSON.toJSONString(catalogJsonFromDb);
stringRedisTemplate.opsForValue().set("catalogJson", jsonString);
return catalogJsonFromDb;
}
Map<String, List<Catelog2Vo>> result = JSON.parseObject(
catalogJson,
new TypeReference<Map<String, List<Catelog2Vo>>>() {});
return result;
}

一开始压力测试没问题,时间久了会报错:

image-20240131083244036

这是因为springboot2.0以后默认使用lettuce作为操作redis的客户端。它使用netty进行网络通信。 lettucel的bug导致nettyi堆外内存溢出-Xmx300m;netty如果没有指定堆外内存,默认使用-Xmx300m 可以通过-Dio.netty.maxDirectMemoryi进行设置 解决方案:不能使用-Dio.netty.maxDirectMemory只去调大堆外内存。

  • 升级lettuce客户端。
  • 或者切换使用jedis
<!-- 引入redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>

缓存穿透

缓存穿透是指在缓存中找不到需要的数据,导致每次请求都要查询数据库或其他存储系统,从而影响性能。这通常发生在请求的键值对在存储系统中不存在,但被频繁地查询。

为了解决缓存穿透问题,可以考虑以下方法:

  1. 空值缓存: 当查询数据库或存储系统后,如果发现数据不存在,可以将这个空值也存入缓存,但设置一个较短的过期时间,防止频繁查询。
  2. 布隆过滤器: 使用布隆过滤器来快速判断一个键值是否存在于缓存中。这样可以在缓存层面快速拦截掉那些明显不存在于存储系统中的请求。
  3. 预热缓存: 在系统启动时或数据更新时,可以通过预热缓存来提前将热门数据加载到缓存中,减少冷启动时的缓存穿透问题。
  4. 限制频繁查询: 对于频繁查询但不会经常变化的数据,可以考虑在缓存层面添加限制,例如采用缓存击穿的防护机制,防止大量请求同时穿透到存储系统。

缓存雪崩

缓存雪崩是指缓存中大量的缓存数据在同一时间失效或过期,导致大量的请求直接访问底层存储系统,从而导致存储系统负载激增,影响系统性能。

为了避免缓存雪崩,可以采取以下一些措施:

  1. 过期时间随机化: 设置缓存数据的过期时间时,可以考虑添加一些随机因素,防止大量缓存在同一时刻过期,减缓对底层系统的冲击。
  2. 持久化缓存: 对于一些重要的缓存数据,可以考虑使用永不过期或较长时间的过期时间,确保即使发生缓存失效,系统也能够继续提供服务。
  3. 分布式锁: 在缓存失效时,可以使用分布式锁来保证只有一个线程或节点可以重新加载缓存,防止大量请求同时击穿。
  4. 多级缓存: 使用多级缓存架构,将缓存数据分布在不同的缓存层级中,即使某一层缓存失效,其他层仍然可以提供部分数据,减轻雪崩效应。
  5. 异步加载: 缓存的异步加载机制可以在缓存失效时,通过异步任务去加载缓存,而不是同步地直接访问底层存储系统,从而减少对底层系统的冲击。

缓存击穿

缓存击穿是指某个缓存键对应的数据在缓存中不存在,但多个并发请求同时请求这个不存在的数据,导致请求穿透到底层存储系统,增加了系统负载。

为了避免缓存击穿,可以采取以下措施:

  1. 缓存预加载: 在系统启动时或数据更新时,可以通过预加载缓存来将热门数据加载到缓存中,避免在请求到来时才去加载。这样可以减少对底层系统的冲击。
  2. 使用互斥锁: 在查询缓存时,可以使用互斥锁来保证只有一个线程或请求可以进行缓存的查询操作。这样可以防止多个请求同时穿透到底层存储系统。
  3. 缓存穿透检测: 在缓存层面可以添加一些检测机制,判断某个键是否存在于缓存中,如果不存在,可以通过一定的策略(如设置一个临时的占位值)防止多个请求同时穿透。
  4. 设置短暂的缓存过期时间: 对于一些不常变化的数据,可以设置一个较短的缓存过期时间,以保证缓存数据能够及时更新,降低缓存失效的概率。
  5. 使用分布式锁: 在缓存失效时,可以使用分布式锁来保证只有一个线程或节点可以重新加载缓存,防止多个请求同时穿透。

使用本地锁

    public Map<String, List<Catelog2Vo>> getCatalogJsonFromDb() {
//只要是同一把锁,就能锁住所有的线程
//synchronized (this):SpringBoot所有的组件在容器中都是单例的
//todo 本地锁 synchronized ,JUC锁 Lock ,在分布式情况下,使用分布式锁 zookeeper redis
synchronized (this){
//得到锁以后,再去缓存中确定是否有数据
String catalogJson = stringRedisTemplate.opsForValue().get("catalogJson");
if (!StringUtils.isEmpty(catalogJson)) {
//缓存中有数据,直接返回
Map<String, List<Catelog2Vo>> result = JSON.parseObject(
catalogJson,
new TypeReference<Map<String, List<Catelog2Vo>>>() {});
return result;
}
//查出剩余结果。。。

String jsonString = JSON.toJSONString(parentCid);
stringRedisTemplate.opsForValue().set("catalogJson", jsonString,1, TimeUnit.HOURS);
return parentCid;
}
}

问题:

本地锁只会锁住这台机器,但是分布式系统下面有多台机器,在高并发情况下吗,每台机器还是会都查询一次数据库

分布式锁

set key value nx:NX -- Only set the key if it does not already exist.

public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {
//1.占分布式锁
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "111");
if (lock) {
//加锁成功
Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
//删除锁
stringRedisTemplate.delete("lock");
return dataFromDb;
} else {
//加锁失败,重试 自旋
return getCatalogJsonFromDbWithRedisLock();
}
}

这样写的问题是:在getDataFromDb中如果出现异常,则不会删除锁,导致出现死锁,解决办法,设置过期时间

public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {
//1.占分布式锁
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "111");
if (lock) {
//加锁成功
//设置过期时间,防止死锁
stringRedisTemplate.expire("lock", 30, TimeUnit.SECONDS);
Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
//删除锁
stringRedisTemplate.delete("lock");
return dataFromDb;
} else {
//加锁失败,重试 自旋
return getCatalogJsonFromDbWithRedisLock();
}
}

这样设置过期时间,问题是还没执行到设置过期时间的时候,出现问题,导致死锁,因此抢锁和设置过期时间应该是一个原子操作

    public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {
//1.占分布式锁
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "111", 30, TimeUnit.SECONDS);
if (lock) {
//加锁成功
//设置过期时间,防止死锁
// stringRedisTemplate.expire("lock", 30, TimeUnit.SECONDS);
Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
//删除锁
stringRedisTemplate.delete("lock");
return dataFromDb;
} else {
//加锁失败,重试 自旋
return getCatalogJsonFromDbWithRedisLock();
}
}

删除锁的时候问题,业务代码非常耗时,例如达到了50s,但是锁的过期时间只有30s,此时就会自动释放锁,导致其他线程进来,当第一个线程执行完的时候,他会去释放第二个线程的锁

解决办法,使用UUID作为值,保证只能删除自己的锁

    public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {
//1.占分布式锁
String uuid= UUID.randomUUID().toString();
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 30, TimeUnit.SECONDS);
if (lock) {
//加锁成功
//设置过期时间,防止死锁
// stringRedisTemplate.expire("lock", 30, TimeUnit.SECONDS);
Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
String lockValue = stringRedisTemplate.opsForValue().get("lock");
//删除锁
if (uuid.equals(lockValue)){
stringRedisTemplate.delete("lock");
}
return dataFromDb;
} else {
//加锁失败,重试 自旋
return getCatalogJsonFromDbWithRedisLock();
}
}

问题:由于获取值+对比成功删除=原子操作,当对比值成功的时候,将要删除锁的时候,锁过期了,这时候别人进来了,又会删除别人的锁,

解决办法,使用lua脚本

    public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {
//1.占分布式锁
String uuid = UUID.randomUUID().toString();
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 30, TimeUnit.SECONDS);
if (lock) {
Map<String, List<Catelog2Vo>> dataFromDb;
try {
dataFromDb = getDataFromDb();
} finally {
//使用lua脚本解锁
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
stringRedisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), uuid);
}
return dataFromDb;
} else {
//加锁失败,重试 自旋
try {
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
return getCatalogJsonFromDbWithRedisLock();
}
}

至此,问题解决,接下来会使用别人封装好的工具(redisson)来操作

Redisson

<!-- 以后使用Redisson作为所有分布式锁 -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.12.0</version>
</dependency>

配置:https://github.com/redisson/redisson/wiki/2.-Configuration

@Configuration
public class MyRedissonConfig {
@Bean(destroyMethod="shutdown")
public RedissonClient redisson() throws IOException {
Config config = new Config();
config.useSingleServer().setAddress("localhost:6379");
return Redisson.create(config);
}
}

分布式锁:

@GetMapping("/hello")
@ResponseBody
public String hello() {
RLock lock = redissonClient.getLock("my-lock");
lock.lock(); //阻塞式等待,默认加的锁都是30s时间
//1.锁的自动续期,如果业务超长,运行期间自动给锁续上新的30s,不用担心业务时间长,锁自动过期被删掉
//2.加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁也会在30s后自动删除
try {
System.out.println("加锁成功,执行业务..." + Thread.currentThread().getId());
Thread.sleep(30000);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
return "hello";
}

Redisson有看门狗机制

最佳实战,指定过期时间,不使用续期

lock.lock(10, TimeUnit.SECONDS);

如果指定了过期时间,自动解锁时间一定要大于业务执行时间,因为不会自动续期

读写锁,保证可以读到最新数据:

@GetMapping("/write")
@ResponseBody
public String writeValue() {
RReadWriteLock lock = redissonClient.getReadWriteLock("my-lock");
String s="";
RLock rLock = lock.writeLock();
try {
rLock.lock();
s= UUID.randomUUID().toString();
System.out.println("写锁加锁成功,执行业务..." + Thread.currentThread().getId());
Thread.sleep(30000);
stringRedisTemplate.opsForValue().set("writeValue",s);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
rLock.unlock();
}
return s;
}

@GetMapping("/read")
@ResponseBody
public String readValue() {
RReadWriteLock lock = redissonClient.getReadWriteLock("my-lock");
String s="";
RLock rLock = lock.readLock();
try {
rLock.lock();
s= stringRedisTemplate.opsForValue().get("writeValue");
System.out.println("读锁加锁成功,执行业务..." + Thread.currentThread().getId());
Thread.sleep(30000);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
rLock.unlock();
}
return s;
}

闭锁,可以用于分布式限流:

@GetMapping("/lockDoor")
@ResponseBody
public String lockDoor() throws InterruptedException {
RCountDownLatch door = redissonClient.getCountDownLatch("door");
door.trySetCount(5);
door.await();
return "放假了...";
}

@GetMapping("/gogogo/{id}")
@ResponseBody
public String gogogo(@RequestBody String id) {
RCountDownLatch door = redissonClient.getCountDownLatch("door");
door.countDown();
return id;
}

信号量,可以做限流

@GetMapping("/park")
@ResponseBody
public String park() throws Exception{
RSemaphore park = redissonClient.getSemaphore("park");
park.acquire();;//获取一个信号,获取一个值-1
return "ok";
}

@GetMapping("/go")
@ResponseBody
public String go() throws Exception{
RSemaphore park = redissonClient.getSemaphore("park");
park.release();;//归还一个信号,获取一个值+1
return "ok";
}

使用redisson修改上面获取三级分类的业务代码:

public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedissonLock() {
//1.锁的名字要有业务名字区分
RLock lock = redissonClient.getLock("catalogJson-lock");
lock.lock();
Map<String, List<Catelog2Vo>> dataFromDb;
try {
dataFromDb = getDataFromDb();
} finally {
lock.unlock();
}
return dataFromDb;
}

缓存数据一致性

双写模式:

image-20240131120604855

失效模式:

image-20240131121548251

我们系统的一致性解决方案: 1、缓存的所有数据都有过期时间,数据过期下一次查询触发主动更新 2、读写数据的时候,加上分布式的读写锁。经常写,经常读

SpringCache

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>

配置:

spring.cache.type=redis

几个注解:

  • Cacheable保存缓存
  • CacheEvict删除
  • CachePut更新
  • Caching组合以上多个操作
  • CacheConfig共享缓存

开启缓存

@EnableCaching
@SpringBootApplication
@MapperScan("com.cxk.gulimall.product.dao")
@EnableDiscoveryClient
@EnableFeignClients(basePackages = "com.cxk.gulimall.product.feign")
public class GulimallProductApplication {
public static void main(String[] args) {
SpringApplication.run(GulimallProductApplication.class, args);
}
}

使用:

   //每一个需要缓存的数据我们都来指定要放到哪个名字的缓存中
@Cacheable({"category"})//代表当前方法的结果需要缓存,如果缓存中有,方法不用调用,如果缓存中没有,会调用方法,最后将方法的结果放入缓存
@Override
public List<CategoryEntity> getLevel1Categories() {
// long l = System.currentTimeMillis();
List<CategoryEntity> categoryEntities = this.baseMapper.selectList(new LambdaQueryWrapper<CategoryEntity>()
.eq(CategoryEntity::getParentCid, 0));
// System.out.println("消耗时间:"+(System.currentTimeMillis() - l));
return categoryEntities;
}

key默认自动生成,缓存名字::SimpleKey

默认的value值,默认使用jdk序列化机制,将序列化后存储redis

默认时间-1

自定义:

  • 指定生产的缓存使用key
  • 指定存活时间,配置文件中修改ttl spring.cache.redis.time-to-live=60000
  • 数据保存为json格式

配置为json格式

@Configuration
@EnableConfigurationProperties(CacheProperties.class)
public class MyCacheConfig {

@Bean
RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties) {
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();
config=config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()));
config=config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));
//将配置文件中的所有配置都生效
CacheProperties.Redis redisProperties = cacheProperties.getRedis();
if (redisProperties.getTimeToLive() != null) {
config=config.entryTtl(redisProperties.getTimeToLive());
}
if (redisProperties.getKeyPrefix() != null) {
config=config.prefixKeysWith(redisProperties.getKeyPrefix());
}
if (!redisProperties.isCacheNullValues()) {
config=config.disableCachingNullValues();
}
if (!redisProperties.isUseKeyPrefix()) {
config=config.disableKeyPrefix();
}
return config;
}
}

修改后删除缓存,CacheEvict:

@CacheEvict(value = {"category"},key = "'getLevel1Categories'")
@Override
@Transactional
public void updateCascade(CategoryEntity category) {
this.updateById(category);
if (!StringUtils.isEmpty(category.getName())) {
categoryBrandRelationService.updateCategory(category.getCatId(), category.getName());
//TODO 同步更新其他关联表的数据
}
}

删除多个缓存:

@Caching(evict = {
@CacheEvict(value = {"category"},key = "'getLevel1Categories'"),
@CacheEvict(value = {"category"},key = "'getCatalogJson'"),
})

SpringCache的不足

读模式:

  • 缓存穿透,查询null数据,解决:缓存空数据spring.cache.redis.cache-null-values=true
  • 缓存击穿,大量并发查一个数据,解决:加锁,默认是无加锁的
  • 缓存雪崩,大量的key同时过期,解决,加随机时间spring.cache.redis.time-to-live=60000