当前位置: 首页 > article >正文

《Redis实战》note-10 扩展Redis

在这里插入图片描述

文章目录

    • 助记提要
    • 10章 扩展Redis
      • 10.1 扩展读性能
        • 通过复制特性添加从服务器
      • 10.2 扩展写性能和内存容量
        • 扩展之前降低内存占用和优化性能
        • 对分片连接的配置和使用
      • 10.3 扩展搜索型查询
        • 使从服务器可写
        • 将搜索索引分片
        • 基于SORT实现索引的分片搜索操作
        • 基于有序集合实现索引的分片搜索操作
        • 10.4 扩展社交网站
        • 已发布状态消息的扩展
        • 时间线的扩展
        • 扩展关注和粉丝列表

助记提要

  1. 如何处理从服务器重同步问题;
  2. Redis故障转移的原理;
  3. 降低内存占用和性能优化的方法总结;
  4. 基于分片的连接装饰器;
  5. 搜索型查询如何扩展;
  6. 分片搜索操作的过程 2步;
  7. 使用分片扩展社交网站

10章 扩展Redis

主从复制 水平分片

10.1 扩展读性能

通过复制特性添加从服务器

在配置中加slaveof host port,或者向服务器发送SLAVEOF host port命令,就可以把服务器变为指定主服务器的从服务器。

  • 注意:
  1. 从服务器的写入配置一般是关闭状态,对从服务器写入会引发错误。
  2. 服务器成为从服务器后,原本的数据将被清空。
  • 从服务器重同步问题
    主服务器同时向多个从服务器发送副本时,可能会把主服务器的带宽消耗尽。
    可以使用主从复制树减少主服务器需要直接同步的从服务器数。
    也能对网络连接进行压缩,减少每次需要传送的数据量。

使用带压缩的SSH隧道进行连接,其加密开销不大。过高的压缩级别会给低处理器的从服务器带来麻烦,最好控制在5级以下。

  • 故障转移
    Redis Sentinel服务器,是特殊模式的Redis服务器,可以监视一系列主服务器和它们的从服务器。
    通过向主服务器发送PUBLISH、SUBCRIBE命令,向主、从服务器发送PING命令,Sentinel进程可以自主识别可用的从服务器和其他Sentinel。
    主服务器失效的时候,监视该主服务器的所有Sentinel会基于彼此公有的信息选出一个Sentinel,并从现有从服务器中选出一个作为新的主服务器。被选中的Sentinel会让剩余的从服务器去复制这个新的主服务器。

10.2 扩展写性能和内存容量

达到服务器性能瓶颈时,需要把数据分片到多台机器组成的群组里面。

扩展之前降低内存占用和优化性能
  • 优化程序,降低需要读取的数据量;
  • 把不相关的功能迁移到其他服务器;
  • 一些统计型的数据,在写入之前,先做聚合计算;
  • 使用锁代替可能限制速度的WATCH/MULTI/EXEC事务,或者使用Lua及脚本;
  • AOF持久化时,使用合适的配置,避免频繁的硬盘写入;
对分片连接的配置和使用

根据名字进行创建或重用Redis连接

def get_redis_connection(component, wait=1):
    key = 'config:redis:' + component
    old_config = CONFIGS.get(key, object())
    # 获取新配置,config_connection是放有配置信息的Redis的连接
    config = get_config(config_connection, 'redis', component, wait)
    # 如果新旧配置不同,就创建一个新的连接
    if config != old_config:
        REDIS_CONNECTIONS[key] = redis.Redis(**config)
    return REDIS_CONNECTIONS.get(key)

根据分片信息获取该分片的连接

def get_sharded_connection(component, key, shard_count, wait=1):
    # shard格式:组件名:分片id,分片id由key计算得到
    shard = shard_key(component, 'x'+str(key), shard_count, 2)
    return get_redis_connection(shard, wait)

支持分片功能的连接装饰器

def sharded_connection(component, shard_count, wait=1):
    # 装饰器接收组件名和预期分片数为参数
    def wrapper(function):
        @functools.wraps(function)
        def call(key, *args, **kwargs):
            # 获取分片连接
            conn = get_sharded_connection(component, key, shard_count, wait)
            # 实际调用被装饰的函数
            return function(conn, key, *args, **kwargs)
        return call
    return wrapper

在分片环境下统计唯一访客数

# 把函数分在16个机器上执行,判定访客是否唯一的集合在每台机器的多个数据库键下
@sharded_connection('unique', 16)
def count_visit(conn, session_id):
    today = date.today()
    key = 'unique:%s' % today.isoformat()
    conn2, expected = get_expected(key, today)
    
    id = int(session_id.replace('-', '')[:15], 16)
    if shard_sadd(conn, key, id, expected, SHARD_SIZE):
        # 非分片连接对唯一计数器执行自增
        conn2.incr(key)

# get_expected函数使用非分片的连接
@redis_connection('unique')
def get_excepted(conn, key, today):
    ...
    # 返回非分片连接,使count_visit可以对唯一计数器执行自增操作
    retrun conn, EXPECTED[key]

使用SETBIT、BITCOUNT和BITOP对二进制位数组进行索引查找,可以在不分片的情况下实现唯一访问计数器。
Redis系统是单线程设计,在多核高带宽的机器上,可以运行多个Redis服务器。只要对这些服务器进行配置,分配不同的端口、不同的快照配置即可。

10.3 扩展搜索型查询

Redis执行的查询不只是取值和写入,对于复杂的查询操作,仅仅对数据进行分片达不到扩展的目的。
搜索型查询就是这样的复杂查询。

使从服务器可写

搜索型查询需要执行SUNIONSTORE、SINTERSTORE、SDIFFSTORE、ZINTERSTORE和ZUNIONSTORE等命令,这些命令都需要对Redis写入。而从服务器默认是只读状态,无法写入。

Redis配置中有个选项slave-read-only,默认为yes,改为no就能在从服务器上执行搜索查询了。不过需要注意重同步的问题。
搜索查询的结果只是被缓存在了执行过查询的从服务器上,如果需要重用被缓存的结果,就需要执行“定期持久化”操作,比如让客户端向相同的Web服务器发请求、Web服务器又向相同的Redis服务器发请求。

将搜索索引分片

搜索索引的大小总会随着时间不断地增长,
搜索查询分片的第一步,需要先对搜索索引进行分片,并且使被索引的每个文档,同一个文档的所有数据都分在同一个分片里。

前面使用的建索引的方法index_document(),会接收连接对象。可以直接传入分片连接,使其把索引建在分片上。或者使用自动分片装饰器操作。

基于SORT实现索引的分片搜索操作

索引分片后,只需要对分片进行查询就可以取得搜索结果。
分片搜索操作可以分为两步:在所有分片上执行搜索查询操作;然后将各个分片的搜索结果进行整合。

执行查询的方法,对基于SORT实现的索引和基于有序集合实现的索引不一样。SORT实现的索引需要统一排序的数据类型。

在单分片上执行的搜索函数

# 函数的参数和之前非分片的search_and_sort相同
def search_get_values(conn, query, id=None, ttl=300, sort="-updated", start=0, num=20):
    # 搜索和排序,取从0开始到start+num位的排序结果
    count, docids, id = search_and_sort(conn, query, id, ttl, sort, 0, start+num)
    key = 'kb:doc:%s'
    sort = sort.lstrip('-')
    # 按排序的顺序取文档的更新时间
    pipe = conn.pipeline(False)
    for docid in docids:
        pipe.hget(key % docid, sort)
    sort_column = pipe.execute()
    # 文档id对应文档的更新时间
    data_pairs = zip(docids, sort_column)
    return count, data_pairs, id

在取后面页的结果时,如取第91-100条结果,由于程序只知道需要比它第90条大的数据,并不知道整合后的这条数据在各个分片上的起始位置start,因此程序在每个分片上都必须取前面的全部100条数据。

在所有分片上执行上述函数

def get_shard_results(component, shards, query, ids=None, ttl=300, sort="-updated", start=0, num=20, wait=1):
    # component是组件名称,shards是分片数
    count = 0
    data = []
    # 如果传入已被缓存的ids,就使用,否则重新执行查询
    ids = ids or shards * [None]
    for shard in range(shards):
        # 获取或创建一个分片连接
        conn = get_redis_connection('%s:%s' % (component, shard), wait)
        # 获取搜索结果,包括选到的文档的更新时间,用于合并
        c, d, i = search_get_values(conn, query, ids[shard], ttl, sort, start, num)
        # 合并计算结果
        count += c
        data.extend(d)
        ids[shard] = i
    return count, data, ids

这个操作可以利用Python的线程并行执行。

对分片搜索结果整合和选取

def to_numeric_key(data):
    try:
        # Decimal类型可以合理地转换整数和浮点数,对缺失值和非数值返回0
        return Decimal(data[1] or '0')
    except:
        return Decimal('0')

def to_string_key(data):
    # 总是返回字符串
    return data[1] or ''

def search_shards(component, shards, query, ids=None, ttl=300, sort="-updated", start=0, num=20, wait=1):
    # 获取所有分片的搜索结果
    count, data, ids = get_shard_results(component, shards, query, ids, ttl, sort, start, num, wait)
    
    # 整理排序所需的参数
    reversed = sort.startswith("-")
    sort = sort.strip('-')
    key = to_numeric_key
    if sort not in ('updated', 'id', 'created'):
        key = to_string_key
    
    # 对搜索结果做排序
    data.sort(key=key, reversed=reversed)
    results = []
    # 仅取指定的页
    for docid, score in data[start:start+num]:
        results.append(docid)
    # ids内是各个分片查询的缓存id
    return count, results, ids

Decimal可以用且更少的代码得到相同的排序,且能正确处理无限大小的数字。
排序时需要考虑缺失值,并把类型统一。

基于有序集合实现索引的分片搜索操作

在单个分片上执行的搜索函数

def search_get_zset_values(conn, query, id=None, ttl=300, update=-1, vote=0, start=0, num=20, desc=True):
    # 取得搜索结果的文档数和id
    count, r, id = search_and_zsort(conn, query, id, ttl, update, vote, 0, start + num - 1, desc)
    
    # 获取搜索结果的分值
    if desc:
        data = conn.zrevrange(id, 0, start + num - 1, withscores=True)
    else:
        data = conn.zrange(id, 0, start + num - 1, withscores=True)
    return count, data, id

这里有序集合的分值是复合索引,取后续页的时候也无法确定起始位置,因此zrevrange也需返回前面全部搜索结果。可以使用zrevrangebyscore,直接用分值比较,但是函数需要多加参数,变得复杂。

整合分片结果

def search_shards_zset(component, shards, query, ids=None, ttl=300, update=-1, vote=0, start=0, num=20, desc=True, wait=1):
    count = 0
    data = []
    ids = ids or shards * [None]
    for shard in range(shards):
        # 获取分片连接
        conn = get_redis_connection('%s:%s' % (component, shard), wait)
        c, d, i = search_get_zset_values(conn, query, ids[shard], ttl, update, vote, start, num, desc)
        # 合并结果
        count += c
        data.extend(d)
        ids[shard] = i
    
    def key(result):
        # 仅返回与分值有关的信息
        return result[1]
    
    # 排序结果
    data.sort(key=key, reversed=desc)
    results = []
    # 从结果里取出文档id,把分值丢弃
    for docid, score in data[start:start+num]:
        results.append(docid)

    return count, results, ids
10.4 扩展社交网站

如何通过分片对社交网站进行扩展。
扩展的第一步就是找出经常被读取和写入的数据,并且考虑把常用和不常用数据分开。

社交网站的数据主要三种:状态消息、时间线、关注和粉丝列表。

已发布状态消息的扩展

状态消息基于散列存储,所以可以基于散列所在的键,将其分片到由多个散列组成的集群里面。
也可以在Redis中仅存储最新发布的消息,而旧的较少读取的消息存在硬盘型数据库中。

添加从服务器只是读写分离,减轻主服务器的读压力。而将数据分片到多个服务器上面,可以降低每个服务器的写压力。

时间线的扩展

主页时间线和列表时间线都比较短,所以不需要对时间线中的内容做分片,而是根据时间线的键名把它们分别存到不同的分片上。

个人时间线,大部分人发布的消息很少,但是少数用户会发布极多的消息。实用的做法是限制每个用户的时间线最多只存2万消息,旧的信息会删除或隐藏。

sharded_timelines = KeyShardedConnection('timelines', 8)

def follow_user(conn, uid, other_uid):
    fkey1 = 'following:%s' % uid
    fkey2 = 'followers:%s' % other_uid
    if conn.zscore(fkey1, other_uid):
        print('already followed', uid, other_uid)
        return None
    
    now = time.time()
    
    pipeline = conn.pipeline(True)
    pipeline.zadd(fkey1, other_uid, now)
    pipeline.zadd(fkey2, uid, now)
    pipeline.zcard(fkey1)
    pipeline.zcard(fkey2)
    following, followers = pipeline.execute()[-2:]
    pipeline.hset('user:%s' % uid, 'following', following)
    pipeline.hset('user:%s' % other_uid, 'followers', followers)
    pipeline.execute()
    
    # 从被关注者的个人时间线取最新的动态
    pkey = 'profile:%s' % other_uid
    status_and_score = sharded_timelines[pkey].zrevrange(pkey, 0, HOME_TIMELINE_SIZE-1, withscores=True)
    
    if status_and_score:
        hkey = 'home:%s' % uid
        # 根据被分片的键获取连接
        pipe = sharded_timelines[hkey].pipeline(True)
        # 将关注者的动态加到分片的主页时间线,并修剪
        pipe.zadd(hkey, **dict(status_and_score))
        pipe.zremrangebyrank(hkey, 0, -HOME_TIMELINE_SIZE-1)
        pipe.execute()
    return True

通用的分片连接类,需要指定组件和分片数,就可以通过被访问的键创建分片连接。

class KeyShardedConnection(object):
    def __init__(self, component, shards):
        # 组件名和分片数
        self.component = component
        self.shards = shards
    
    def __getitem__(self, key):
        # 根据要查找的键,获取其所在分片的连接
        return get_sharded_connection(self.component, key, self.shards)
扩展关注和粉丝列表

用户关注人数可以设人数限制,但是用户的粉丝数无法设限。粉丝列表过大的话,还是需要把它们所在的有序集合分到多个分片上。
前面实现的关注动作是按数据的键做的分片,但是现在需要按关注数据来分片。
同时,粉丝和被关注者放在同一个分片上,可以显著减少创建和调用的连接数。程序使用粉丝和被关注者的两个id一起当做查找分片的参数。

对粉丝有序集合和被关注者有序集合做分片后的关注操作

sharded_timelines = KeyShardedConnection('timelines', 8)
sharded_followers = KeyShardedConnection('followers', 16)

def follow_user(conn, uid, other_uid):
    fkey1 = 'following:%s' % uid
    fkey2 = 'followers:%s' % other_uid
    
    # 根据两个用户的id获取连接对象
    sconn = sharded_followers[uid, other_uid]
    if sconn.zscore(fkey1, other_uid):
        return None
    
    now = time.time()
    spipe = sconn.pipeline(True)
    spipe.zadd(fkey1, other_uid, now)
    spipe.zadd(fkey2, uid, now)
    following, followers = spipe.execute()
    
    # 更新关注数和粉丝数
    pipeline = conn.pipeline(True)
    pipeline.hincrby('user:%s' % uid, 'following', int(following))
    pipeline.hincrby('user:%s' % other_uid, 'followers', int(followers))
    pipeline.execute()
    
    pkey = 'profile:%s' % other_uid
    status_and_score = sharded_timelines[pkey].zrevrange(pkey, 0, HOME_TIMELINE_SIZE-1, withscores=True)
    
    if status_and_score:
        hkey = 'home:%s' % uid
        pipe = sharded_timelines[hkey].pipeline(True)
        pipe.zadd(hkey, **dict(status_and_score))
        pipe.zremrangebyrank(hkey, 0, -HOME_TIMELINE_SIZE-1)
        pipe.execute()
    return True

前面的通过键获取分片连接的类,改为通过id对获取连接。

calss KeyDataShardedConnection(object):
    def __init__(self, component, shards):
       ...
    
    def __getitem__(self, ids):
        # 确保传入的id都是整数
        id1, id2 = map(int, ids)
        # 确保第一个id较小
        if id2 < id1:
            id1, id2 = id2, id1
        # 两个id构建一个键
        key = "%s:%s" % (id1, id2)
        # 使用构建的键获取分片连接
        return get_sharded_connection(self.component, key, self.shards)

较小的id放在前面,这样用户无论何时以什么顺序访问两个id,得到的分片总是一样的。

通过分片连接器,可以对其他的有序集合操作进行更新。其他操作的好多处都用到了ZRANGEBYSCORE命令获取粉丝,因此先做一个分片版本的ZRANGEBYSCORE。

def sharded_zrangebyscore(component, shards, key, min, max, num):
    data = []
    for shard in range(shards):
        # 获取分片连接
        conn = get_redis_connection("%s:%s" % (component, shard))
        # 从分片上取出数据
        data.extend(conn.zrangebyscore(key, min, max, start=0, num=num, withscores=True))
    
    def key(pair):
        return pair[1], pair[0]
    # 先基于分值排序,再基于成员排序
    data.sort(key=key)
    return data[:num]

可以根据固定分值进行排序的时候,直接从分片上取大于某个分值的一页数据就可以。

对动态广播函数做更新

def syndicate_status(uid, post, start=0, on_lists=False):
    root = 'followers'
    key = 'followers:%s' % uid
    base = 'home:%s'
    if on_lists:
        root = 'list:out'
        key = 'list:out:%s' % uid
        base = 'list:statuses:%s'
    
    # 找出关注者
    followers = sharded_zrangebyscore(root, sharded_followers.shards, key, start, 'inf', POSTS_PER_PASS)
    
    # 对粉丝的主页时间线的键按分片进行分组
    to_send = defaultdict(list)
    for follower, start in followers:
        timeline = base % follower
        shard = shard_key('timelines', timeline, sharded_timelines.shards, 2)
        to_send[shard].append(timeline)
    
    for timelines in to_send.itervalues():
        pipe = sharded_timelines[timelines[0]].pipeline(False)
        for timeline in timelines:
            # 添加新消息,移除过旧的消息
            pipe.zadd(timeline, **post)
            pipe.zremrangebyrank(timeline, 0, -HOME_TIMELINE_SIZE-1)
        pipe.execute()
    
    conn = redis.Redis()
    if len(followers) >= POSTS_PER_PASS:
        execute_later(conn, 'default', 'syndicate_status', [uid, post, start, on_lists])
    elif not on_lists:
        execute_later(conn, 'default', 'syndicate_status', [uid, post, 0, on_lists])

http://www.kler.cn/news/367885.html

相关文章:

  • 使用Python计算相对强弱指数(RSI)进阶
  • Linux - 文件描述符 | 文件系统 | 软硬链接
  • 【Docker】docker | 部署nginx
  • WSL(Ubuntu20.04)编译和安装DPDK
  • Spring Boot框架下的酒店住宿登记系统
  • ThinkPad T480拆机屏幕改装:便携式显示器DIY指南
  • 【MySQL】C语言连接MySQL数据库2——基本API的学习
  • 手把手教——class1_VScode配置C++环境
  • 大粤金融智能交易系统的创新与应用
  • FPGA 蜂鸣器 音乐播放器
  • 【Docker命令】日常使用的Docker命令
  • Pandas库学习Day21
  • javaWeb项目-ssm+vue高校网课管理系统功能介绍
  • Cursor零基础小白教程系列 - 创建你的第一个Cursor 项目
  • CSS伪元素以及伪类和CSS特性
  • 获 Sei 基金会投资的 MetaArena :掀起新一轮链上游戏革命
  • Adam优化器算法详解
  • 【C++复习】第二弹-内存管理
  • 3.Linux按键驱动-添加循环队列
  • 【Android】多渠道打包配置
  • Android 自定义 Dialog 实现列表 单选,多选,搜索
  • Python4
  • 大学新生如何入门编程:选择语言、制定计划及避开学习陷阱
  • Page Cache(页缓存
  • 学习记录:js算法(七十五): 加油站
  • 【C++】异常处理实例详解