【已上线】C++ mysql连接池
目录
- 1 为什么需要链接池
- 2 实现原理
- 3 代码
- 4 编译
- 5 参考
1 为什么需要链接池
- 可以复用已经建立好的链接,节约数据库建立链接的时间。原理上,和线程池类似。
- 我们项目中的一个实际需求,同时可能有多个线程同时访问数据库。这样每个线程都需要和数据库建立链接才能保证线程安全。有了链接池,即可以复用已有的链接,有保证的线程安全。
2 实现原理
db_connection.h和 db_connection.cpp是对数据库链接的封装
db_connection_pool.h和db_connection_pool.cpp是链接池的实现。在初始化的时候,会提前构造一些链接放到容器中,当池子空了之后,采用生产者-消费者模式构造链接。如果构造的链接个数到达指定上限,从链接池里获取链接的时候,就会失败。另外,我们也会有一个单独的线程,删除长时间不用的链接,让链接池的大小保持在初始大小。
3 代码
// db_connection.h
#pragma once
#include <string>
#include <chrono>
#include "mysql/mysql.h"
class DBConnection {
public:
DBConnection();
~DBConnection();
bool connect(std::string host, int port, std::string user, std::string password, std::string dbname);
bool update(std::string sql);
MYSQL_RES* query(std::string sql);
// 如果这个链接被使用了,就刷新一下时间。因为链接池会把好久不用的多于链接删除
void refreshAliveTime() { m_alivetime = std::chrono::system_clock::now(); }
int getAliveeTime() const { return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - m_alivetime).count(); }
private:
MYSQL* m_conn;
std::chrono::system_clock::time_point m_alivetime;
};
// db_connection.cpp
#include "db_connection.h"
DBConnection::DBConnection() {
m_conn = mysql_init(nullptr);
}
DBConnection::~DBConnection() {
if (m_conn != nullptr)
mysql_close(m_conn);
}
bool DBConnection::connect(std::string host, int port, std::string user, std::string password, std::string dbname) {
if (mysql_real_connect(m_conn, host.c_str(), user.c_str(), password.c_str(), dbname.c_str(), port, nullptr, 0) == nullptr) {
return false;
}
// mysql_query(m_conn, "set names utf8mb4");
mysql_set_character_set(m_conn, "utf8");
return true;
}
bool DBConnection::update(std::string sql) {
if (mysql_query(m_conn, sql.c_str())) {
return false;
}
return true;
}
MYSQL_RES* DBConnection::query(std::string sql) {
if (mysql_query(m_conn, sql.c_str())) {
return nullptr;
}
return mysql_use_result(m_conn);
}
// db_connection_pool.h
#pragma once
#include "db_connection.h"
#include <queue>
#include <mutex>
#include <thread>
#include <atomic>
#include <functional>
#include <condition_variable>
class DBConnectionPool {
public:
static DBConnectionPool* getInstance();
bool init(const std::string& config);
std::shared_ptr<DBConnection> getConnection();
std::string getTable() const { return m_table; }
private:
DBConnectionPool() = default;
bool loadConfigFile(const std::string& config);
void produceConnectionTask();
void scannerConnectionTask();
bool createTable();
private:
std::string m_host; // 数据库ip
int m_port; // 数据端口号
std::string m_user; // 用户名
std::string m_password; // 密码
std::string m_database; // 数据库名
std::string m_table; // 表名
int m_initSize; // 初始链接数
int m_maxSize; // 最大链接数
int m_maxIdleTime; // 单位: s
int m_connectionTimeout; // 单位: s
std::queue<DBConnection*> m_connectionQueue; // 存放链接的容器
std::mutex m_connectionQueueMutex; // 保证容器操作线程安全
std::atomic_int m_connectionCnt; // 容器大小
std::condition_variable m_cv; // 用作线程通讯的信号
};
// db_connection_pool.cpp
#include "db_connection_pool.h"
#include <thread>
#include "rapidjson/document.h" // 用来解析json
#include "rapidjson/filereadstream.h"
#include "rapidjson/error/en.h"
namespace handler_process {
DBConnectionPool* DBConnectionPool::getInstance() {
static DBConnectionPool pool;
return &pool;
}
bool DBConnectionPool::init(const std::string& config) {
if (!loadConfigFile(config)) {
ASYNC_LOG_ERROR("[DBConnectionPool::init] loadConfigFile failure");
return false;
}
ASYNC_LOG_INFO("[DBConnectionPool::init] init success, create db connection pool, size: {}", m_initSize);
m_connectionCnt = 0;
for (int i = 0; i < m_initSize; ++i) {
DBConnection* conn = new DBConnection();
if (!conn->connect(m_host, m_port, m_user, m_password, m_database)) {
ASYNC_LOG_ERROR("[DBConnectionPool::init] connect failure, host: {}, port: {}, user: {}, dbname: {}", m_host, m_port, m_user, m_database);
delete conn;
continue;
}
conn->refreshAliveTime();
m_connectionQueue.push(conn);
m_connectionCnt++;
}
if (m_connectionCnt.load() == 0) {
ASYNC_LOG_ERROR("[DBConnectionPool::init] init failure, create db connection pool failure");
return false;
}
ASYNC_LOG_INFO("[DBConnectionPool::init] init success, create db connection pool succ, size: {}", m_connectionCnt.load());
// 如果池子空了,这个线程负责构造新的链接
std::thread produce(std::bind(&DBConnectionPool::produceConnectionTask, this));
produce.detach();
// 这个线程负责删除多于的链接
std::thread scanner(std::bind(&DBConnectionPool::scannerConnectionTask, this));
scanner.detach();
if (!createTable()) {
ASYNC_LOG_ERROR("[DBConnectionPool::init] create table failure");
return false;
}
return true;
}
bool DBConnectionPool::loadConfigFile(const std::string& config) {
ASYNC_LOG_INFO("[DBConnectionPool::loadConfigFile] config_file: {}", config);
FILE* fp = fopen(config.c_str(), "r");
if (!fp) {
ASYNC_LOG_ERROR("[DBConnectionPool::loadConfigFile] unable to open file: {}", config);
return false;
}
// 创建文件读取流
char readBuffer[65536];
rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
// 解析JSON数据
rapidjson::Document document;
if (document.ParseStream(is).HasParseError()) {
ASYNC_LOG_ERROR("[DBConnectionPool::loadConfigFile] parse error");
fclose(fp);
return false;
}
fclose(fp);
m_table = std::string(document["table"].GetString());
m_host = std::string(document["host"].GetString());
m_user = std::string(document["user"].GetString());
m_password = std::string(document["password"].GetString());
m_database = std::string(document["database"].GetString());
m_port = document["port"].GetInt();
m_initSize = document["connection_pool_init_size"].GetInt();
m_maxSize = document["connection_pool_max_size"].GetInt();
m_maxIdleTime = document["connection_pool_max_idle_time"].GetInt();
m_connectionTimeout = document["connection_pool_max_wait_time"].GetInt();
ASYNC_LOG_INFO("[DBConnectionPool::loadConfigFile] m_manualInterventionInfoTableName: {}, m_dynamicDBInfoTableName: {}, " \
"m_host: {}, m_user: {}, m_database: {}, m_port: {}, m_initSize: {}, m_maxSize: {}, m_maxIdleTime: {}, m_connectionTimeout: {}",
m_manualInterventionInfoTableName, m_dynamicDBInfoTableName,
m_host, m_user, m_database, m_port, m_initSize, m_maxSize, m_maxIdleTime, m_connectionTimeout);
return true;
}
bool DBConnectionPool::createTable() {
std::shared_ptr<DBConnection> dbConnection = getConnection();
if (!dbConnection) {
ASYNC_LOG_ERROR("[DBConnectionPool::createTable] get connection failure");
return false;
}
std::string create_table_query_1 = "CREATE TABLE IF NOT EXISTS " + m_table +
"(id INT AUTO_INCREMENT PRIMARY KEY)";
return dbConnection->update(create_table_query_1);
}
void DBConnectionPool::produceConnectionTask() {
while (true) {
std::unique_lock<std::mutex> lock(m_connectionQueueMutex);
while (!m_connectionQueue.empty()) {
m_cv.wait(lock);
}
if (m_connectionCnt.load() < m_maxSize) {
DBConnection* conn = new DBConnection();
if (!conn->connect(m_host, m_port, m_user, m_password, m_database)) {
ASYNC_LOG_ERROR("[DBConnectionPool::produceConnectionTask] connect failure, host: {}, port: {}, user: {}, dbname: {}", m_host, m_port, m_user, m_database);
delete conn;
std::this_thread::sleep_for(std::chrono::seconds(3));
continue;
}
conn->refreshAliveTime();
m_connectionQueue.push(conn);
m_connectionCnt++;
ASYNC_LOG_INFO("[DBConnectionPool::produceConnectionTask] produce connection succ, m_connectionCnt: {}", m_connectionCnt.load());
}
m_cv.notify_all();
}
}
void DBConnectionPool::scannerConnectionTask() {
while (true) {
ASYNC_LOG_INFO("[DBConnectionPool::scannerConnectionTask] scanner connection, m_connectionCnt: {}, m_maxIdleTime: {}s", m_connectionCnt.load(), m_maxIdleTime);
std::this_thread::sleep_for(std::chrono::seconds(m_maxIdleTime));
std::unique_lock<std::mutex> lock(m_connectionQueueMutex);
while (m_connectionCnt.load() > m_initSize) {
DBConnection* conn = m_connectionQueue.front();
if (conn->getAliveeTime() >= m_maxIdleTime) {
m_connectionQueue.pop();
m_connectionCnt--;
delete conn;
} else {
break; // 队头的连接没有超过m_maxIdleTime,其它连接肯定没有
}
}
}
}
std::shared_ptr<DBConnection> DBConnectionPool::getConnection() {
std::unique_lock<std::mutex> lock(m_connectionQueueMutex);
while (m_connectionQueue.empty()) {
if (m_cv.wait_for(lock, std::chrono::seconds(m_connectionTimeout)) == std::cv_status::timeout) {
if (m_connectionQueue.empty()) {
ASYNC_LOG_ERROR("[DBConnectionPool::getConnection] get connection timeout: {}", m_connectionTimeout);
return nullptr;
}
}
}
std::shared_ptr<DBConnection> conn(m_connectionQueue.front(), [&](DBConnection* pconn) {
std::unique_lock<std::mutex> lock(m_connectionQueueMutex);
pconn->refreshAliveTime();
m_connectionQueue.push(pconn);
m_connectionCnt++;
});
m_connectionQueue.pop();
m_connectionCnt--;
m_cv.notify_all();
ASYNC_LOG_INFO("[DBConnectionPool::getConnection] get connection success, residue m_connectionCnt: {}", m_connectionCnt.load());
return conn;
}
配置文件:
// db_conf.json
{
"table": "test_table",
"host": "10.10.10.123",
"user": "root",
"password": "123456",
"database": "test_db",
"port": 3306,
"connection_pool_init_size": 5,
"connection_pool_max_size" : 50,
"connection_pool_max_idle_time": 600,
"connection_pool_max_wait_time": 5
}
使用方式:
// main.cpp
#include <iostream>
#include "db_connection_pool.h"
int main(int argc, char** argv) {
if (!handler_process::DBConnectionPool::getInstance()->init("db_conf.json")) {
std::cerr << "init db connection pool failure" << std::endl;
return -1;
}
std::shared_ptr<DBConnection> dbConnection = DBConnectionPool::getInstance()->getConnection();
std::string querySql = "SELECT * FROM " + DBConnectionPool::getInstance()->getTable() + " WHERE id = 1";
MYSQL_RES* res = dbConnection->query(querySql);
if (res == nullptr) {
std::cerr << "query err" << std::endl;
return -1;
}
MYSQL_ROW column;
while((column = mysql_fetch_row(res)) != NULL) {
try {
int id = std::stoi(column[0]);
} catch (const std::exception& e) {
std::cerr << "exception: " << e.what() << std::endl;
continue;
}
}
mysql_free_result(res);
return 0;
}
4 编译
如果是centos,使用yum安装mysql客户端
yum install -y mysql-devel mysql
CMakeLists.txt
...
link_directories("/usr/lib64/mysql")
set(MYSQL_LIBS mysqlclient)
add_executable(main main.cpp db_connection.cpp db_connection_pool.cpp)
target_link_libraries(main pthread ${MYSQL_LIBS})
5 参考
C++ mysql连接池 附带讲解与详细注释源码