当前位置: 首页 > article >正文

通知服务刷新本地缓存的实现方案

为了提升接口性能,对于那种读频繁但写不频繁的数据,我们可以在应用本地缓存一份数据,当接口收到请求时就可以读取本地数据来降低响应时间。如果是单机部署的服务,当数据更新时可以同时刷新缓存数据;但对于集群部署的服务,就要通知所有服务刷新本地缓存。对于通知服务去刷新缓存的功能,下面介绍几种实现方案。
本地缓存数据可以使用map实现,也可以使用成熟的缓存框架实现,比如guava、caffeine等,如果使用map实现缓存数据,我们还要考虑缓存数据过期的相关处理逻辑,而不论是guava还是caffeine框架,他们都已经实现了相关的操作,所以还是建议大家不要重复造轮子了,直接使用成熟的方案。
示例讲解本地缓存选择使用caffeine作为缓存框架,所以第一步就是将caffeine封装成bean供其他地方使用:

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;

/**
 * @Author xingo
 * @Date 2024/10/24
 */
@Slf4j
@Configuration
public class CaffeineConfig {

    /**
     * 缓存配置
     * @return
     */
    @Bean
    public Cache<String, Object> caffeineCache() {
        return Caffeine.newBuilder()
                .initialCapacity(1024)   // 初始化容量
                .maximumSize(65536)      // 最大容量
                .expireAfterWrite(Duration.ofMinutes(30))   // 键过期时间:在写入后的30分钟
                .removalListener(new RemovalListener<String, Object>() {  // 监听数据移除
                    @Override
                    public void onRemoval(@Nullable String key, @Nullable Object data, @NonNull RemovalCause removalCause) {
                        log.info("缓存键移除|{}", key);
                    }
                })
                .build();
    }
}

再定义两个接口,分别用于获取缓存中的数据和刷新缓存数据:

import com.github.benmanes.caffeine.cache.Cache;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Author xingo
 * @Date 2024/10/24
 */
@RestController
public class CacheController {

    @Autowired
    private Cache<String, Object> caffeineCache;

    /**
     * 获取缓存中的数据
     */
    @GetMapping("/get/cache")
    public String getCache(String key) {
        Object val = caffeineCache.getIfPresent(key);
        return "val = " + val;
    }

    /**
     * 刷新缓存中的数据
     */
    @GetMapping("/refresh/cache")
    public String refreshCache(String key, String val) {
        Object val1 = caffeineCache.getIfPresent(key);
        System.out.println("刷新前数据|" + key + "|" + val1);
        caffeineCache.invalidate(key);
        caffeineCache.put(key, val);
        val1 = caffeineCache.getIfPresent(key);
        System.out.println("刷新后数据|" + key + "|" + val1);

        return "ok";
    }
}

基础工作已经准备完成,接下来展示各个实现方案:

一、使用redis的订阅发布功能

redis 的订阅发布功能可以实现对监听了某个通道的所有服务发送通知,只需要服务订阅某个通道即可。将要刷新的数据发送到这个通道,所有服务都会接收到。
要使用该功能,首先在应用中创建一个监听某个通道的客户端:

import com.github.benmanes.caffeine.cache.Cache;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;

/**
 * 监听redis发布订阅消息
 *
 * @Author xingo
 * @Date 2024/10/24
 */
@Slf4j
@Configuration
@Component
public class RedisConsumer {

    @Autowired
    private Cache<String, Object> caffeineCache;

    public static final String refreshChannel = "refresh-channel";

    /**
     * 注册监听器
     */
    @Bean
    public RedisMessageListenerContainer redisContainer(RedisConnectionFactory factory) {
        final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        container.addMessageListener(listenerRefreshLocalCache(), new ChannelTopic(refreshChannel));
        return container;
    }

    /**
     * 处理监听到的数据
     */
    @Bean
    public MessageListenerAdapter listenerRefreshLocalCache() {
        return new MessageListenerAdapter(new MessageListener() {

            @Override
            public void onMessage(Message message, byte[] pattern) {
                try {
                    String topic = new String(message.getChannel());
                    String body = new String(message.getBody());
                    log.info("通知刷新本地缓存|{}|{}", topic, body);
                    String[] arr = body.split("\\|");

                    caffeineCache.invalidate(arr[0]);
                    caffeineCache.put(arr[0], arr[1]);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

当要刷新数据时,将数据发送到redis服务的某个通道:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.xingo.config.RedisConsumer;

/**
 * @Author xingo
 * @Date 2024/10/24
 */
@RestController
public class RedisController {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @GetMapping("/redis/refresh")
    public String redisRefresh(String key, String val) {
        redisTemplate.convertAndSend(RedisConsumer.refreshChannel, key + "|" + val);
        return "ok";
    }
}

上面两步就完成了本地缓存刷新的功能。

二、使用zookeeper的子节点监听功能

使用zookeeper的子节点监听功能也可以实现数据刷新,实现方案是:服务启动时注册一个事件监听器,监听某个节点的子节点创建事件;当监听到节点创建时,就获取节点下的数据,使用这个数据刷新本地缓存。这里创建的节点还需要注意节点的删除,否则节点会一直增加最终导致zookeeper服务内存不足的隐患,在zookeeper的高版本中已经支持节点的ttl时间设置,要使用ttl功能,需要在zk配置文件zoo.cfg中添加配置:

extendedTypesEnabled=true

添加这个参数后,重启zk服务。再创建一个watch监听子节点状态:

import com.github.benmanes.caffeine.cache.Cache;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/**
 * @Author xingo
 * @Date 2024/10/24
 */
@Slf4j
@Component
public class ZkClient {

    public static final String refreshNode = "/refresh-node";

    @Value("${zookeeper.url}")
    private String url;

    @Getter
    private CuratorFramework client;

    @Autowired
    private Cache<String, Object> caffeineCache;

    @PostConstruct
    public void init() {
        client = CuratorFrameworkFactory.builder()
                .connectString(url)
                .sessionTimeoutMs((int) TimeUnit.MINUTES.toMillis(5))
                .connectionTimeoutMs((int) TimeUnit.SECONDS.toMillis(30))
                .retryPolicy(new ExponentialBackoffRetry(3000, 5))
                .build();
        client.start();

        this.listener();
    }

    /**
     * 监听子节点
     */
    public void listener() {
        // 创建对象
        PathChildrenCache cache = new PathChildrenCache(client, refreshNode, true);

        // 添加监听
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                if(event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
                    try {
                        String data = new String(event.getData().getData());
                        log.info("通知刷新本地缓存|{}", data);
                        String[] arr = data.split("\\|");

                        caffeineCache.invalidate(arr[0]);
                        caffeineCache.put(arr[0], arr[1]);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                } else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) {
                    log.info("子节点删除|{}", event.getData().getPath());
                }
            }
        });

        // 开启监听
        try {
            cache.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @PreDestroy
    public void destroy() {
        client.close();
    }
}

当要刷新本地的缓存数据,只需要在 /refresh-node 下创建一个顺序节点,并把要刷新的数据放到节点中:

import org.apache.zookeeper.CreateMode;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.xingo.config.ZkClient;

/**
 * @Author xingo
 * @Date 2024/10/24
 */
@RestController
public class ZkController {

    @Autowired
    private ZkClient zkClient;

    @GetMapping("/zk/refresh")
    public String redisRefresh(String key, String val) throws Exception {
        String data = key + "|" + val;
        zkClient.getClient().create().withTtl(5).creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL_WITH_TTL).forPath(ZkClient.refreshNode + "/node", data.getBytes());
        return "ok";
    }
}

zk节点列表

三、使用SpringCloud接口完成刷新

使用SpringCloud刷新,首先通过注册中心获取到所有服务的连接信息,通过请求http接口刷新数据。实现也是非常简单:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

import java.util.List;

/**
 * @Author xingo
 * @Date 2024/10/24
 */
@RestController
public class ClientController {

    @Autowired
    private DiscoveryClient discoveryClient;
    @Autowired
    private RestTemplate restTemplate;

    @GetMapping("/clients")
    public String getClients(String serverName) {
        if (serverName == null || serverName.trim().isEmpty()) {
            return "fail";
        }

        List<String> services = discoveryClient.getServices();
        if (services != null) {
            for (String service : services) {
                List<ServiceInstance> instances = discoveryClient.getInstances(service);
                if (instances != null) {
                    for (ServiceInstance instance : instances) {
                        String serviceId = instance.getServiceId();
                        String uri = instance.getUri().toString();

                        if (serverName.equalsIgnoreCase(serviceId)) {
                            String url = uri + "/refresh/cache?key=testkey&val=testval";
                            String rs = restTemplate.getForObject(url, String.class);
                            System.out.println("发起刷新请求|" + url + "|" + rs);
                        }
                    }
                }
            }
        }

        return "ok";
    }
}
四、使用定时任务

在项目中启动一个定时任务,定时检查数据变更,用变更的数据刷新本地缓存,这种实现方式可能存在延时,通过数据库+定时任务方式就可以实现该方案:

首先在数据库中创建表:

CREATE TABLE `local_cache`  (
  `id` int NOT NULL AUTO_INCREMENT,
  `key` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `val` varchar(1024) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `create_time` datetime NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;

当要刷新某个本地缓存的数据时,只需要向表中插入一条数据,定时任务检查表中数据,检测到数据就刷新对应的键值:

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.*;
import org.xingo.model.LocalCache;

import java.time.LocalDateTime;
import java.util.List;

/**
 * @Author xingo
 * @Date 2024/10/24
 */
@Mapper
public interface LocalCacheMapper extends BaseMapper<LocalCache> {

    @Insert("insert into local_cache(`key`, `val`, `create_time`) values(#{key}, #{val}, now())")
    int addCache(@Param(value = "key") String key, @Param(value = "val") String val);

    @Select("select id, `key`, `val`, `create_time` as createTime from local_cache")
    List<LocalCache> getCaches();

    @Delete("delete from local_cache where `create_time` <= #{dateTime}")
    int removeCaches(@Param(value = "dateTime") LocalDateTime dateTime);
}

定义定时任务检查表数据:

import com.github.benmanes.caffeine.cache.Cache;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.xingo.mapper.LocalCacheMapper;
import org.xingo.model.LocalCache;

import java.time.LocalDateTime;
import java.util.List;

/**
 * @Author xingo
 * @Date 2024/10/24
 */
@Slf4j
@EnableScheduling
@Component
public class LocalCacheJob {

    @Autowired
    private Cache<String, Object> caffeineCache;
    @Autowired
    private LocalCacheMapper localCacheMapper;

    @Scheduled(cron="0/3 * * * * ?")
    public void run() {
        List<LocalCache> datas = localCacheMapper.getCaches();

        if(datas != null) {
            for (LocalCache data : datas) {
                log.info("定时刷新本地缓存|{}", data);

                caffeineCache.invalidate(data.getKey());
                caffeineCache.put(data.getKey(), data.getVal());
            }
        }

        LocalDateTime time = LocalDateTime.now().plusSeconds(-10);
        localCacheMapper.removeCaches(time);
    }
}

当要刷新某个数据,只需要向表中插入数据:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.xingo.mapper.LocalCacheMapper;

/**
 * @Author xingo
 * @Date 2024/10/24
 */
@RestController
public class DbController {

    @Autowired
    private LocalCacheMapper localCacheMapper;

    @GetMapping("/db/refresh")
    public String redisRefresh(String key, String val) {
        localCacheMapper.addCache(key, val);
        return "ok";
    }
}
五、使用配置中心

通过监听配置中心某个配置的刷新来实现本地缓存的刷新,已nacos注册中心为例,创建一个bean监听配置中心某个配置的变化:

import com.alibaba.cloud.nacos.NacosConfigManager;
import com.alibaba.nacos.api.config.listener.Listener;
import com.github.benmanes.caffeine.cache.Cache;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

/**
 * @Author xingo
 * @Date 2024/10/24
 */
@Slf4j
@Component
public class NacosListener {

    @Autowired
    private Cache<String, Object> caffeineCache;
    @Autowired
    private NacosConfigManager nacosConfigManager;

    @PostConstruct
    public void init() throws Exception {
        nacosConfigManager.getConfigService().addListener("listener.yaml", "DEFAULT_GROUP", new Listener() {
            @Override
            public Executor getExecutor() {
                return Executors.newSingleThreadExecutor();
            }

            @Override
            public void receiveConfigInfo(String body) {
                log.info("通知刷新本地缓存|{}", body);
                String[] arr = body.split("\\|");

                caffeineCache.invalidate(arr[0]);
                caffeineCache.put(arr[0], arr[1]);
            }
        });
    }

}

通过代码动态发布配置信息到配置中心:

import com.alibaba.cloud.nacos.NacosConfigManager;
import com.alibaba.nacos.api.exception.NacosException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.xingo.mapper.LocalCacheMapper;

/**
 * @Author xingo
 * @Date 2024/10/24
 */
@RestController
public class NacosController {

    @Autowired
    private NacosConfigManager nacosConfigManager;

    @GetMapping("/nacos/refresh")
    public String redisRefresh(String key, String val) throws NacosException {
        nacosConfigManager.getConfigService().publishConfig("listener.yaml", "DEFAULT_GROUP", key + "|" + val);

        return "ok";
    }
}

nacos配置

但是这种方式不推荐,多个线程同时写的时候,存在并发问题。

六、消息队列实现

这种方式跟redis实现类似,当我们向消息队列某个主题发送消息时,这条消息会广播给所有监听该主题的服务。比如使用RocketMQ的广播消费功能。


http://www.kler.cn/news/364117.html

相关文章:

  • 爬虫结合项目实战
  • 使用docker-compose搭建redis7集群-3主3从
  • 《计算机视觉》—— 基于 dlib 库的方法将两张人脸图片进行换脸
  • 075_基于springboot的万里学院摄影社团管理系统
  • 程序员:数字时代的先锋
  • 后端C++
  • sql-labs靶场第二十一关测试报告
  • 【ChatGPT】如何通过实例提升 ChatGPT 的回答质量
  • 技术成神之路:设计模式(二十三)解释器模式
  • 介绍 TensorFlow 的基本概念和使用场景(AI生成仅供参考)
  • 读数据工程之道:设计和构建健壮的数据系统19数据存储系统 (下)
  • 基于neo4j的疫情信息管理系统
  • 【WebSocket实战】——创建项目初始架构
  • Linux 设备树在 i.MX6U 上的应用与详解
  • 基于 Python 的自然语言处理系列(43):Question Answering
  • 架构设计(17)大数据框架Hadoop与基础架构CDH
  • 又是一年 1024
  • Python酷库之旅-第三方库Pandas(167)
  • 鸿蒙原生 证书 打包到真机
  • 使用docker-compose部署一个springboot项目(包含Postgres\redis\Mongo\Nginx等环境)
  • STL标准容器库
  • 【华为HCIP实战课程十七】OSPF的4类及5类LSA详解,网络工程师
  • nginx------HTTP模块配置详解
  • 什么是虚拟线程?Java 中虚拟线程的介绍与案例演示
  • 【Unity实战笔记】第二一 · 基于状态模式的角色控制——以UnityChan为例
  • ArcGIS计算落入面图层中的线的长度或面的面积