当前位置: 首页 > article >正文

ElasticSearch09-并发控制

零、文章目录

ElasticSearch09-并发控制

1、文档冲突

  • 在Elasticsearch中,文档冲突通常发生在并发更新时,多个进程或线程尝试修改同一个文档,但是没有正确的版本控制机制来处理这些并发操作。
(1)文档冲突的场景
  • 并发更新:当多个进程几乎同时对同一个文档进行更新时,如果它们的版本号相同或旧版本尝试覆盖新版本,就会发生冲突。
  • 批量操作:在批量更新或删除操作中,如果文档的版本号在操作执行期间发生变化,也可能导致冲突。
(2)解决文档冲突的方法
  • 乐观锁机制:Elasticsearch使用乐观锁机制,基于文档版本实现并发控制。每次更新或删除数据时,都需要对比版本号。
  • 外部版本控制:使用external版本类型,可以让外部系统(如数据库)控制版本号。当索引请求的版本号大于当前存储文档的版本时,文档才会被更新。
  • 使用文档版本号:Elasticsearch中的每个文档都有一个版本号(_version),在更新文档时,可以指定期望的版本号。如果指定的版本号低于实际版本号,更新会失败,并返回409冲突错误。
  • 使用if_seq_noif_primary_term:对于索引操作,可以使用if_seq_noif_primary_term参数来确保操作只在特定序列号和primary term的文档上执行,从而避免冲突。
  • 批量操作中的冲突处理:在执行批量更新(update_by_query)时,可以通过设置conflicts=proceed参数来忽略冲突并继续更新其他文档。
  • 重试机制:对于单个更新操作,可以使用retry_on_conflict参数来设置冲突后的重试次数。
(3)乐观并发控制(OCC)
  • 乐观并发控制基于这样的假设:事务之间的冲突很少发生,因此它允许事务在没有锁定任何资源的情况下进行。OCC通常在事务提交时检查是否违反了一致性条件。
  • 特点
    • 无锁:在事务执行期间,不锁定任何资源,因此可以提高并发性。
    • 冲突检测:在事务提交时检测冲突,如果发现冲突,则拒绝当前事务。
    • 重试机制:如果事务被拒绝,可以重新启动并重试。
    • 适用于冲突较少的环境:在冲突概率较低的情况下,OCC可以提高系统性能。
  • 实现方式
    • 版本号机制:每个数据项都有一个版本号,事务开始时记录版本号,提交时检查版本号是否变化。
    • 时间戳机制:事务根据时间戳顺序执行,时间戳较晚的事务在检测到冲突时需要等待或回滚。
(4)悲观并发控制(PCC)
  • 悲观并发控制基于这样的假设:事务之间的冲突很常见,因此它在事务开始时就锁定需要的资源,以防止其他事务访问。
  • 特点
    • 锁定资源:事务在执行过程中锁定资源,其他事务不能访问被锁定的资源。
    • 冲突预防:通过锁定资源来预防冲突,确保事务的隔离性。
    • 可能导致死锁:锁定机制可能导致死锁,需要额外的机制来检测和解决死锁。
    • 适用于冲突较多的环境:在冲突概率较高的情况下,PCC可以确保数据的一致性。
  • 实现方式
    • 行级锁:锁定数据库中的特定行或记录。
    • 表级锁:锁定整个表,阻止其他事务对表的访问。
    • 排它锁和共享锁:排它锁(Exclusive Locks)允许事务修改数据,共享锁(Shared Locks)允许多个事务读取数据。

2、乐观并发控制

(1)实现乐观控制
  • _version:es 每个文档都有版本号,每次更新版本号都会更新,老版本的 es 在更新数据的时候可以在 url 加上参数 _version=版本号,这样更新的时候就会对比版本,版本不匹配,更新就不成功。
  • if_seq_noif_primary_term:新版本的 es 已经不允许直接使用 version 来控制版本,但是可以使用if_seq_noif_primary_term这两个版本号衍生出来的字段来控制,效果是一样的。
(2)创建索引
# 请求
PUT my_index_1

# 返回
{
  "acknowledged" : true,
  "shards_acknowledged" : true,
  "index" : "my_index_1"
}
(3)添加文档
  • 首先,我们索引一个新文档,当前版本号是 1
# 请求
POST /my_index_1/_doc/1
{
  "title": "Elasticsearch版本控制",
  "content": "学习Elasticsearch版本控制的重要性。"
}

# 返回
{
  "_index" : "my_index_1",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,
  "result" : "created",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 0,
  "_primary_term" : 1
}
(4)更新文档
  • 我们使用参数if_seq_no=0&if_primary_term=1 来更新文档
# 请求
POST /my_index_1/_doc/1?if_seq_no=0&if_primary_term=1
{
  "content": "深入理解Elasticsearch版本控制的机制。"
}

# 返回
{
  "_index" : "my_index_1",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 2,
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 1,
  "_primary_term" : 1
}
(5)并发更新
  • 假设另一个线程也尝试更新同一个文档,但是使用的是旧的版本信息,将返回409错误,表示版本已经过时。
# 请求
POST /my_index_1/_doc/1?if_seq_no=0&if_primary_term=1
{
  "content": "探索Elasticsearch版本控制的最佳实践。"
}

# 返回
{
  "error" : {
    "root_cause" : [
      {
        "type" : "version_conflict_engine_exception",
        "reason" : "[1]: version conflict, required seqNo [0], primary term [1]. current document has seqNo [1] and primary term [1]",
        "index_uuid" : "yQKxbXSMTu-S2w28Quakjw",
        "shard" : "0",
        "index" : "my_index_1"
      }
    ],
    "type" : "version_conflict_engine_exception",
    "reason" : "[1]: version conflict, required seqNo [0], primary term [1]. current document has seqNo [1] and primary term [1]",
    "index_uuid" : "yQKxbXSMTu-S2w28Quakjw",
    "shard" : "0",
    "index" : "my_index_1"
  },
  "status" : 409
}
(6)处理冲突
  • 我们可以在客户端实现重试逻辑。如果收到409错误,我们可以获取最新的文档版本信息,然后重试更新
# 请求
GET /my_index_1/_doc/1

# 返回
{
  "_index" : "my_index_1",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 2,
  "_seq_no" : 1,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "content" : "深入理解Elasticsearch版本控制的机制。"
  }
}
  • 使用新版本信息更新成功
# 请求
POST /my_index_1/_doc/1?if_seq_no=1&if_primary_term=1
{
  "content": "探索Elasticsearch版本控制的最佳实践。"
}

# 返回
{
  "_index" : "my_index_1",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 3,
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 2,
  "_primary_term" : 1
}

3、外部并发控制

(1)外部版本控制场景
  • 一个常见的场景是使用其它数据库作为主要的数据存储,使用 Elasticsearch 做数据检索,这意味着主数据库的所有更改发生时都需要被复制到 Elasticsearch,如果多个进程负责这一数据同步,你可能遇到类似于之前描述的并发问题。此时需要控制版本号大于 es 版本号。
(2)三种控制方式对比
  • 乐观并发控制(_version):老版本 es 使用,传入的版本号必须和 es 数据版本号一致。
  • 乐观并发控制(if_seq_noif_primary_term):新版本 es 使用,传入的参数必须和 es 数据一致。
  • 外部版本控制:通过version_type=external&version=版本号参数来实现。但是这里的版本号必须是大于 es 数据中的版本号,小于等于会报错。
(3)低版本更新文档
  • 当外部版本号小于等于 es 版本号时,更新失败。
# 请求
POST /my_index_1/_doc/1?version_type=external&version=3
{
  "content": "探索Elasticsearch版本控制的最佳实践。222"
}

# 返回
{
  "error" : {
    "root_cause" : [
      {
        "type" : "version_conflict_engine_exception",
        "reason" : "[1]: version conflict, current version [3] is higher or equal to the one provided [3]",
        "index_uuid" : "yQKxbXSMTu-S2w28Quakjw",
        "shard" : "0",
        "index" : "my_index_1"
      }
    ],
    "type" : "version_conflict_engine_exception",
    "reason" : "[1]: version conflict, current version [3] is higher or equal to the one provided [3]",
    "index_uuid" : "yQKxbXSMTu-S2w28Quakjw",
    "shard" : "0",
    "index" : "my_index_1"
  },
  "status" : 409
}
(4)高版本更新文档
# 请求
POST /my_index_1/_doc/1?version_type=external&version=4
{
  "content": "探索Elasticsearch版本控制的最佳实践。222"
}

# 返回
{
  "_index" : "my_index_1",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 4,
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 3,
  "_primary_term" : 1
}

http://www.kler.cn/a/441896.html

相关文章:

  • (7)(7.2) 围栏
  • mysql查看binlog日志
  • ASP.NET Core 中的 JWT 鉴权实现
  • 微软宣布Win11 24H2进入新阶段!设备将自动下载更新
  • ChatGPT 写作系列
  • 密钥轮换时,老数据该如何处理
  • 【HarmonyOS之旅】HarmonyOS开发基础知识(一)
  • CCNP_SEC_ASA 第四天作业
  • SAP PP 死循环bom,递归BOM的问题 ,再bom保存时校验
  • python练习之“用 Python 的 Pygame 库创建五子棋游戏”
  • Spring Boot 声明式事务
  • CentOs7使用yum安装docker
  • UITableView显示数据,增加数据,删除数据及移动数据行
  • 完全二叉树【东北大学oj数据结构9-1】C++
  • 面试之手撸安全队列
  • 栈(线性表2)
  • 关于opengauss
  • 《半导体芯片制程:微观世界里的科技风云》
  • 蓝桥杯数列求值(2019试题C)
  • 【系统】Windows11更新解决办法,一键暂停
  • 安卓课设版算法计算器
  • 用.Net Core框架创建一个Web API接口服务器
  • lambda 表达式 闭包写法
  • 模具生产过程中的标签使用流程图
  • 前端的Python入门指南(完):错误和异常处理策略及最佳实践
  • YOLOv9-0.1部分代码阅读笔记-activations.py