当前位置: 首页 > article >正文

【gRPC】clientPool 客户端连接池简单实现与go案例

什么是 gRPC 客户端连接池?

  • 在 gRPC 中,创建和维护一个到服务器的连接是非常消耗资源的(比如 TCP 连接建立和 TLS 握手)。

  • 而在高并发场景下,如果每次请求都创建新的连接,不仅会导致性能下降,还可能耗尽系统资源。

  • 因此,客户端连接池的作用是复用一定数量的连接,提高资源利用率和性能。


gRPC 客户端连接池的原理

  1. 连接复用,池子里的连接使用时取出,用完放回
  2. 控制连接数,可以固定数量或动态调整,防止建太多连接
  3. 并发安全

先展示一个基于sync.pool创建的clientPool

  • 实际上,企业不推荐使用sync包里的无锁机制,
  • 因为sync包里的无锁设计适用于高并发,短暂资源的情况,
  • 而gRPC本身设计初衷是客户端连接是长生命周期,需要稳定管理的资源,与sync.pool的特性不完全匹配
因此为了更好实现,可以自己加锁设计,或者使用第三方库这里举例github:go-grpc-pool

type ClientPool interface {
	Get() *grpc.ClientConn
	Put(conn *grpc.ClientConn)
}

type clientPool struct {
	pool sync.Pool
}

func GetPool(target string, opts ...grpc.DialOption) (ClientPool, error) {
	return &clientPool{
		pool: sync.Pool{
			New: func() any {
				conn, err := grpc.Dial(target, opts...)
				if err != nil {
					log.Println(err)
					return nil
				}
				return conn
			},
		},
	}, nil
}

func (c *clientPool) Get() *grpc.ClientConn {
	conn := c.pool.Get().(*grpc.ClientConn)
	if conn.GetState() == connectivity.Shutdown || conn.GetState() == connectivity.TransientFailure {
		conn.Close()
		conn = c.pool.New().(*grpc.ClientConn)
	}
	return conn
}

func (c *clientPool) Put(conn *grpc.ClientConn) {
	if conn.GetState() == connectivity.Shutdown || conn.GetState() == connectivity.TransientFailure {
		conn.Close()
		return
	}
	c.pool.Put(conn)
}

自己加锁设计

package main

import (
	"log"
	"sync"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/connectivity"
)

// ClientPool 定义接口
type ClientPool interface {
	Get() (*grpc.ClientConn, error)
	Put(conn *grpc.ClientConn)
	Close()
}

// clientPool 是 ClientPool 的实现
type clientPool struct {
	mu          sync.Mutex
	connections chan *grpc.ClientConn
	maxSize     int
	idleTimeout time.Duration
	target      string
	opts        []grpc.DialOption
	closed      bool
}

// NewClientPool 创建一个新的客户端连接池
func NewClientPool(target string, maxSize int, idleTimeout time.Duration, opts ...grpc.DialOption) (ClientPool, error) {
	if maxSize <= 0 {
		return nil, ErrInvalidMaxSize
	}

	pool := &clientPool{
		connections: make(chan *grpc.ClientConn, maxSize),
		maxSize:     maxSize,
		idleTimeout: idleTimeout,
		target:      target,
		opts:        opts,
	}

	// 预填充池
	for i := 0; i < maxSize; i++ {
		conn, err := pool.createConnection()
		if err != nil {
			return nil, err
		}
		pool.connections <- conn
	}

	return pool, nil
}

// createConnection 创建新连接
func (p *clientPool) createConnection() (*grpc.ClientConn, error) {
	conn, err := grpc.Dial(p.target, p.opts...)
	if err != nil {
		return nil, err
	}
	return conn, nil
}

// Get 从连接池获取一个连接
func (p *clientPool) Get() (*grpc.ClientConn, error) {
	p.mu.Lock()
	defer p.mu.Unlock()

	if p.closed {
		return nil, ErrPoolClosed
	}

	select {
	case conn := <-p.connections:
		// 检查连接状态
		if conn.GetState() == connectivity.Shutdown || conn.GetState() == connectivity.TransientFailure {
			conn.Close()
			return p.createConnection()
		}
		return conn, nil
	default:
		// 如果没有空闲连接,尝试创建新的连接
		return p.createConnection()
	}
}

// Put 将连接放回池中
func (p *clientPool) Put(conn *grpc.ClientConn) {
	if conn == nil {
		return
	}

	// 检查连接状态
	if conn.GetState() == connectivity.Shutdown || conn.GetState() == connectivity.TransientFailure {
		conn.Close()
		return
	}

	select {
	case p.connections <- conn:
		// 放回池中
	default:
		// 如果池已满,直接关闭连接
		conn.Close()
	}
}

// Close 关闭连接池
func (p *clientPool) Close() {
	p.mu.Lock()
	defer p.mu.Unlock()

	if p.closed {
		return
	}

	p.closed = true
	close(p.connections)

	for conn := range p.connections {
		conn.Close()
	}
}

// 错误定义
var (
	ErrInvalidMaxSize = log.New("invalid max size")
	ErrPoolClosed     = log.New("connection pool is closed")
)

// 示例使用
func main() {
	pool, err := NewClientPool("localhost:50051", 10, time.Minute, grpc.WithInsecure())
	if err != nil {
		log.Fatalf("Failed to create pool: %v", err)
	}

	conn, err := pool.Get()
	if err != nil {
		log.Fatalf("Failed to get connection: %v", err)
	}

	// 使用连接
	// client := pb.NewYourServiceClient(conn)

	// 放回连接
	pool.Put(conn)

	// 程序退出时关闭连接池
	pool.Close()
}

http://www.kler.cn/a/506684.html

相关文章:

  • Spring Boot 动态表操作服务实现
  • 具体场景的 MySQL 与 redis 数据一致性设计
  • 【WPS】【WORDEXCEL】【VB】实现微软WORD自动更正的效果
  • HunyuanVideo 文生视频模型实践
  • 4G DTU赋能智能配电环网柜通信运维管理
  • 【leetcode21】344.反转字符串
  • Go语言之路————条件控制:if、for、switch
  • Oracle EBS GL定期盘存WIP日记账无法过账数据修复
  • Go语言封装加解密包(AES/DES/RSA)
  • Sprint Boot教程之五十八:动态启动/停止 Kafka 监听器
  • 说说Babylon.js中scene.deltaTime的大坑
  • 如何异地远程访问本地部署的Web-Check实现团队远程检测与维护本地站点
  • 《DeepSeek V3:重新定义AI大模型的效率与成本》
  • Qt实现防止程序多次运行
  • Java学习教程,从入门到精通,JDBC数据库连接语法知识点及案例代码(92)
  • Outlook 无网络连接[2604] 错误解决办法
  • python批量doc转pdf调用提示库未注册
  • 华北水利水电大学第十届ACM/ICPC程序设计新生赛题解
  • Django Admin 实战:实现 ECS 集群批量同步功能
  • 【6】Word:海名公司文秘❗
  • SuperMap iClient3D for Cesium立体地图选中+下钻特效
  • 【Docker】使用Dev Container进行开发
  • HTML 中的 Window 和 Document 介绍
  • 【Uniapp-Vue3】manifest.json配置
  • 前后端分离开发心得
  • 十分钟带汝入门大数据开发语言Scala