40分钟学 Go 语言高并发:【实战】分布式缓存系统
【实战课程】分布式缓存系统
一、整体架构设计
首先,让我们通过架构图了解分布式缓存系统的整体设计:
核心组件
组件名称 | 功能描述 | 技术选型 |
---|---|---|
负载均衡层 | 请求分发、节点选择 | 一致性哈希 |
缓存节点 | 数据存储、过期处理 | 内存存储 + 持久化 |
同步机制 | 节点间数据同步 | Pub/Sub + Gossip |
监控系统 | 性能监控、故障检测 | Prometheus + Grafana |
二、核心代码实现
1. 缓存节点实现
package dcache
import (
"context"
"encoding/json"
"sync"
"time"
)
// CacheItem 缓存项结构
type CacheItem struct {
Value interface{} `json:"value"`
Expiration int64 `json:"expiration"`
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"`
}
// CacheNode 缓存节点结构
type CacheNode struct {
nodeID string
items sync.Map
peers map[string]*CacheNode
peerLock sync.RWMutex
options *Options
}
// Options 配置选项
type Options struct {
DefaultExpiration time.Duration
CleanupInterval time.Duration
MaxItems int
}
// NewCacheNode 创建新的缓存节点
func NewCacheNode(nodeID string, opts *Options) *CacheNode {
node := &CacheNode{
nodeID: nodeID,
peers: make(map[string]*CacheNode),
options: opts,
}
// 启动清理过期项的定时任务
if opts.CleanupInterval > 0 {
go node.cleanupLoop()
}
return node
}
// Set 设置缓存项
func (n *CacheNode) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error {
item := &CacheItem{
Value: value,
CreatedAt: time.Now().UnixNano(),
UpdatedAt: time.Now().UnixNano(),
}
if expiration == 0 {
expiration = n.options.DefaultExpiration
}
if expiration > 0 {
item.Expiration = time.Now().Add(expiration).UnixNano()
}
n.items.Store(key, item)
// 通知其他节点更新
n.notifyPeers(ctx, key, item)
return nil
}
// Get 获取缓存项
func (n *CacheNode) Get(ctx context.Context, key string) (interface{}, bool) {
if value, exists := n.items.Load(key); exists {
item := value.(*CacheItem)
if item.Expiration > 0 && item.Expiration < time.Now().UnixNano() {
n.items.Delete(key)
return nil, false
}
return item.Value, true
}
return nil, false
}
// Delete 删除缓存项
func (n *CacheNode) Delete(ctx context.Context, key string) {
n.items.Delete(key)
// 通知其他节点删除
n.notifyPeersDelete(ctx, key)
}
// cleanupLoop 清理过期项的循环
func (n *CacheNode) cleanupLoop() {
ticker := time.NewTicker(n.options.CleanupInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
n.cleanup()
}
}
}
// cleanup 清理过期项
func (n *CacheNode) cleanup() {
now := time.Now().UnixNano()
n.items.Range(func(key, value interface{}) bool {
item := value.(*CacheItem)
if item.Expiration > 0 && item.Expiration < now {
n.items.Delete(key)
}
return true
})
}
// AddPeer 添加对等节点
func (n *CacheNode) AddPeer(peer *CacheNode) {
n.peerLock.Lock()
defer n.peerLock.Unlock()
n.peers[peer.nodeID] = peer
}
// RemovePeer 移除对等节点
func (n *CacheNode) RemovePeer(peerID string) {
n.peerLock.Lock()
defer n.peerLock.Unlock()
delete(n.peers, peerID)
}
// notifyPeers 通知其他节点更新
func (n *CacheNode) notifyPeers(ctx context.Context, key string, item *CacheItem) {
n.peerLock.RLock()
defer n.peerLock.RUnlock()
for _, peer := range n.peers {
go func(p *CacheNode) {
p.receiveUpdate(ctx, key, item)
}(peer)
}
}
// receiveUpdate 接收更新通知
func (n *CacheNode) receiveUpdate(ctx context.Context, key string, item *CacheItem) {
n.items.Store(key, item)
}
2. 一致性哈希实现
package dcache
import (
"hash/crc32"
"sort"
"sync"
)
type ConsistentHash struct {
circle map[uint32]string
sortedHashes []uint32
nodes map[string]bool
virtualNodes int
mu sync.RWMutex
}
func NewConsistentHash(virtualNodes int) *ConsistentHash {
return &ConsistentHash{
circle: make(map[uint32]string),
nodes: make(map[string]bool),
virtualNodes: virtualNodes,
}
}
// Add 添加节点
func (c *ConsistentHash) Add(node string) {
c.mu.Lock()
defer c.mu.Unlock()
if _, exists := c.nodes[node]; exists {
return
}
c.nodes[node] = true
for i := 0; i < c.virtualNodes; i++ {
hash := c.hashKey(fmt.Sprintf("%s-%d", node, i))
c.circle[hash] = node
}
c.updateSortedHashes()
}
// Remove 移除节点
func (c *ConsistentHash) Remove(node string) {
c.mu.Lock()
defer c.mu.Unlock()
if _, exists := c.nodes[node]; !exists {
return
}
delete(c.nodes, node)
for i := 0; i < c.virtualNodes; i++ {
hash := c.hashKey(fmt.Sprintf("%s-%d", node, i))
delete(c.circle, hash)
}
c.updateSortedHashes()
}
// Get 获取负责的节点
func (c *ConsistentHash) Get(key string) string {
c.mu.RLock()
defer c.mu.RUnlock()
if len(c.circle) == 0 {
return ""
}
hash := c.hashKey(key)
idx := c.searchForNode(hash)
return c.circle[c.sortedHashes[idx]]
}
// hashKey 计算哈希值
func (c *ConsistentHash) hashKey(key string) uint32 {
return crc32.ChecksumIEEE([]byte(key))
}
// updateSortedHashes 更新已排序的哈希值切片
func (c *ConsistentHash) updateSortedHashes() {
hashes := make([]uint32, 0, len(c.circle))
for k := range c.circle {
hashes = append(hashes, k)
}
sort.Slice(hashes, func(i, j int) bool {
return hashes[i] < hashes[j]
})
c.sortedHashes = hashes
}
// searchForNode 查找适合的节点
func (c *ConsistentHash) searchForNode(hash uint32) int {
idx := sort.Search(len(c.sortedHashes), func(i int) bool {
return c.sortedHashes[i] >= hash
})
if idx >= len(c.sortedHashes) {
idx = 0
}
return idx
}
3. 数据同步流程图
4. 故障恢复实现
package dcache
import (
"context"
"sync"
"time"
)
type FailureDetector struct {
nodes map[string]*NodeStatus
mu sync.RWMutex
checkInterval time.Duration
timeout time.Duration
}
type NodeStatus struct {
LastHeartbeat time.Time
IsAlive bool
Address string
}
func NewFailureDetector(checkInterval, timeout time.Duration) *FailureDetector {
fd := &FailureDetector{
nodes: make(map[string]*NodeStatus),
checkInterval: checkInterval,
timeout: timeout,
}
go fd.startDetection()
return fd
}
// RegisterNode 注册节点
func (fd *FailureDetector) RegisterNode(nodeID, address string) {
fd.mu.Lock()
defer fd.mu.Unlock()
fd.nodes[nodeID] = &NodeStatus{
LastHeartbeat: time.Now(),
IsAlive: true,
Address: address,
}
}
// UpdateHeartbeat 更新心跳
func (fd *FailureDetector) UpdateHeartbeat(nodeID string) {
fd.mu.Lock()
defer fd.mu.Unlock()
if node, exists := fd.nodes[nodeID]; exists {
node.LastHeartbeat = time.Now()
node.IsAlive = true
}
}
// startDetection 开始故障检测
func (fd *FailureDetector) startDetection() {
ticker := time.NewTicker(fd.checkInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fd.detectFailures()
}
}
}
// detectFailures 检测故障
func (fd *FailureDetector) detectFailures() {
fd.mu.Lock()
defer fd.mu.Unlock()
now := time.Now()
for nodeID, status := range fd.nodes {
if status.IsAlive && now.Sub(status.LastHeartbeat) > fd.timeout {
status.IsAlive = false
go fd.handleNodeFailure(nodeID)
}
}
}
// handleNodeFailure 处理节点故障
func (fd *FailureDetector) handleNodeFailure(nodeID string) {
// 1. 通知其他节点
fd.notifyPeers(nodeID)
// 2. 触发数据重平衡
fd.rebalanceData(nodeID)
}
// notifyPeers 通知其他节点
func (fd *FailureDetector) notifyPeers(failedNodeID string) {
fd.mu.RLock()
defer fd.mu.RUnlock()
for nodeID, status := range fd.nodes {
if nodeID != failedNodeID && status.IsAlive {
go fd.sendFailureNotification(status.Address, failedNodeID)
}
}
}
// sendFailureNotification 发送故障通知
func (fd *FailureDetector) sendFailureNotification(address, failedNodeID string) {
// 实现具体的通知逻辑
// 可以使用HTTP或gRPC等方式
}
// rebalanceData 重平衡数据
func (fd *FailureDetector) rebalanceData(failedNodeID string) {
// 1. 确定需要迁移的数据
// 2. 选择目标节点
// 3. 执行数据迁移
fd.mu.RLock()
defer fd.mu.RUnlock()
var aliveNodes []string
for nodeID, status := range fd.nodes {
if status.IsAlive && nodeID != failedNodeID {
aliveNodes = append(aliveNodes, nodeID)
}
}
if len(aliveNodes) == 0 {
return
}
// 触发数据迁移
go fd.migrateData(failedNodeID, aliveNodes)
}
// migrateData 迁移数据
func (fd *FailureDetector) migrateData(failedNodeID string, aliveNodes []string) {
// 实现数据迁移逻辑
}
// IsNodeAlive 检查节点是否存活
func (fd *FailureDetector) IsNodeAlive(nodeID string) bool {
fd.mu.RLock()
defer fd.mu.RUnlock()
if status, exists := fd.nodes[nodeID]; exists {
return status.IsAlive
}
return false
}
// GetAliveNodes 获取所有存活节点
func (fd *FailureDetector) GetAliveNodes() []string {
fd.mu.RLock()
defer fd.mu.RUnlock()
var aliveNodes []string
for nodeID, status := range fd.nodes {
if status.IsAlive {
aliveNodes = append(aliveNodes, nodeID)
}
}
return aliveNodes
}
三、缓存同步机制
1. 同步策略比较
策略 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
同步复制 | 强一致性 | 性能较差 | 对一致性要求高的场景 |
异步复制 | 性能好 | 最终一致性 | 对性能要求高的场景 |
半同步复制 | 折中方案 | 实现复杂 | 平衡性能和一致性 |
2. 数据同步实现
package dcache
import (
"context"
"encoding/json"
"sync"
"time"
)
type SyncManager struct {
node *CacheNode
syncInterval time.Duration
syncTimeout time.Duration
syncQueue chan *SyncTask
wg sync.WaitGroup
}
type SyncTask struct {
Key string
Value interface{}
Operation string // "set" or "delete"
Timestamp int64
}
func NewSyncManager(node *CacheNode, syncInterval, syncTimeout time.Duration) *SyncManager {
sm := &SyncManager{
node: node,
syncInterval: syncInterval,
syncTimeout: syncTimeout,
syncQueue: make(chan *SyncTask, 1000),
}
go sm.processSyncQueue()
return sm
}
// AddSyncTask 添加同步任务
func (sm *SyncManager) AddSyncTask(task *SyncTask) {
select {
case sm.syncQueue <- task:
// 成功添加到队列
default:
// 队列已满,记录错误日志
}
}
// processSyncQueue 处理同步队列
func (sm *SyncManager) processSyncQueue() {
ticker := time.NewTicker(sm.syncInterval)
defer ticker.Stop()
var tasks []*SyncTask
for {
select {
case task := <-sm.syncQueue:
tasks = append(tasks, task)
// 批量处理
if len(tasks) >= 100 {
sm.processBatch(tasks)
tasks = tasks[:0]
}
case <-ticker.C:
if len(tasks) > 0 {
sm.processBatch(tasks)
tasks = tasks[:0]
}
}
}
}
// processBatch 批量处理同步任务
func (sm *SyncManager) processBatch(tasks []*SyncTask) {
ctx, cancel := context.WithTimeout(context.Background(), sm.syncTimeout)
defer cancel()
// 按节点分组任务
tasksByNode := make(map[string][]*SyncTask)
for _, task := range tasks {
// 使用一致性哈希确定目标节点
node := sm.node.hashRing.Get(task.Key)
tasksByNode[node] = append(tasksByNode[node], task)
}
// 并发同步到各节点
var wg sync.WaitGroup
for node, nodeTasks := range tasksByNode {
wg.Add(1)
go func(node string, tasks []*SyncTask) {
defer wg.Done()
sm.syncToNode(ctx, node, tasks)
}(node, nodeTasks)
}
wg.Wait()
}
// syncToNode 同步到指定节点
func (sm *SyncManager) syncToNode(ctx context.Context, nodeID string, tasks []*SyncTask) {
// 1. 建立连接
conn, err := sm.getNodeConnection(nodeID)
if err != nil {
return
}
// 2. 发送同步数据
for _, task := range tasks {
switch task.Operation {
case "set":
conn.Set(ctx, task.Key, task.Value, 0)
case "delete":
conn.Delete(ctx, task.Key)
}
}
}
// getNodeConnection 获取节点连接
func (sm *SyncManager) getNodeConnection(nodeID string) (*CacheNode, error) {
// 实现节点连接池逻辑
return nil, nil
}
// StartFullSync 启动全量同步
func (sm *SyncManager) StartFullSync() {
sm.wg.Add(1)
go func() {
defer sm.wg.Done()
sm.fullSync()
}()
}
// fullSync 全量同步
func (sm *SyncManager) fullSync() {
// 1. 获取源节点数据快照
snapshot := sm.node.GetSnapshot()
// 2. 同步到目标节点
for key, value := range snapshot {
task := &SyncTask{
Key: key,
Value: value,
Operation: "set",
Timestamp: time.Now().UnixNano(),
}
sm.AddSyncTask(task)
}
}
// WaitForSync 等待同步完成
func (sm *SyncManager) WaitForSync() {
sm.wg.Wait()
}
四、监控指标
1. 核心监控指标
type Metrics struct {
// 缓存命中率
HitCount int64
MissCount int64
HitRate float64
// 容量指标
ItemCount int64
MemoryUsage int64
// 性能指标
AvgLatency float64
P95Latency float64
P99Latency float64
// 同步指标
SyncQueueSize int64
SyncLatency float64
SyncErrorCount int64
}
2. 监控指标表
指标类型 | 指标名称 | 说明 | 告警阈值 |
---|---|---|---|
性能指标 | avgLatency | 平均响应延迟 | >50ms |
性能指标 | p95Latency | 95分位延迟 | >100ms |
性能指标 | p99Latency | 99分位延迟 | >200ms |
命中率 | hitRate | 缓存命中率 | <80% |
容量指标 | memoryUsage | 内存使用率 | >80% |
同步指标 | syncQueueSize | 同步队列大小 | >1000 |
同步指标 | syncLatency | 同步延迟 | >1s |
错误指标 | errorCount | 错误次数 | >100/min |
五、优化建议
1. 性能优化
- 使用内存预分配
- 采用批量操作
- 实现多级缓存
- 使用零拷贝技术
2. 可靠性优化
- 实现故障自动转移
- 添加熔断机制
- 实现请求重试
- 数据定期备份
3. 监控优化
- 实现多维度监控
- 添加实时告警
- 收集详细日志
- 定期压测验证
六、实战建议
-
开发阶段:
- 充分测试各个组件
- 模拟各种故障场景
- 进行性能基准测试
- 编写完善的单元测试
-
部署阶段:
- 合理规划节点部署
- 配置监控告警
- 准备回滚方案
- 进行容量规划
-
运维阶段:
- 定期检查监控指标
- 及时处理告警信息
- 定期进行压力测试
- 制定应急预案
七、实战练习
-
基础练习:
- 实现简单的缓存节点
- 实现基本的数据同步
- 添加简单的监控指标
-
进阶练习:
- 实现完整的故障检测
- 实现数据自动迁移
- 实现多级缓存策略
-
高级练习:
- 优化同步性能
- 实现数据压缩
- 实现缓存预热
怎么样今天的内容还满意吗?再次感谢观众老爷的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!