高并发性能优化随笔
高并发性能优化技巧
目录
- 高并发性能优化技巧
- 目录
- MySQL
- 索引
- 分库分表
- 注意事项
- 做好 SQL 监控
- 缓存
- 使用 Redis 加快访问
- Redis 大 Key 问题
- Redis 热 Key 问题
- 缓存击穿
- Memcached 双活架构
- 集群隔离
- 多级缓存
- 消息队列与流量聚合
- MQ
- 流量聚合
- 线程池
- 总结
在我的高并发技巧系列文章中,我已经介绍了不少性能优化的技巧。为了保证这篇文章的完整性,可能会有一些重复。本文将重点介绍 C 端开发中常用的中间件(MySQL、Redis、MQ、RPC)如何提高性能,并在最后介绍一些奇淫巧技。
MySQL
索引
关于 MySQL 优化的帖子网上已经很多,这里只强调一些关键点:
- 索引:MySQL 的所有 C 端查询应尽量使用索引,尤其是数据量增长快的表。避免在索引字段上进行函数运算或使用取反操作。
- 联合索引:联合索引使用非常广泛,SQL 语句写法不当可能导致全表扫描。记住最左匹配原则,将区分度高的字段放在前面。
- 分页查询:C 端经常展示列表,如文章列表、作品列表等,基本都用到了分页查询。确保
order by
字段在索引中,否则可能会出现filesort
,这在数据量大时非常致命。 - 养成好习惯:创建表时就设计好索引,不要等到出问题后再加,那时数据量可能已经很大。
分库分表
不要无脑分库分表,尤其在中小公司,这会增加维护成本。一般预估年增量记录在 500 万以内都不需要分,之前我们是 1000 万以内不分。如果只是临时活动使用的表,更不需要分,除非活动期间量真的达到了几亿。简单介绍下分的原则:
- 数据量大就分表:单表查询性能会随着数据量增加而下降。
- 并发高就分库:单库的资源有限,支持的连接数不会太高。
- 大多数情况下分库分表一起进行:因为量大和并发高经常同时出现。
- 一开始分就给足资源:二次分表成本太大,在快手一般上来就是 10 个库和 1000 张表。
- 特别注意 shardKey:围绕用户展开的就用
userId
作为 shardKey,这在绝大多数情况下没问题。
如何分,见仁见智,可以使用 ShardingJDBC,优点是减少开发成本。也可以自己手动分,优点是便于定制化和制定规则。如果你犹豫不决,就用 ShardingJDBC,尤其是小团队。
注意事项
- 减少操作数据库次数:例如批量更新用户奖励时,避免多次调用
update
方法,可以一次批量更新解决。 - 使用迭代器查询:如果要查询的数据量很多,如瓜分活动需要查出来所有参与并完成活动的用户,假设表有 2000 万数据,满足要求的用户可能有百万。正确的做法是分批查询,每次查询 200 条,然后进行处理。可以参考下面的迭代器写法:
import com.google.common.collect.AbstractIterator;
import org.apache.commons.collections.CollectionUtils;
import java.util.Iterator;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
public class SimpleSingeFieldIterable<I extends Comparable<I>, R> implements Iterable<List<R>> {
private final BiFunction<I, Integer, List<R>> searchDAO;
private I searchPosition;
private final Integer count;
private final Function<R, I> model2IdFunction;
public SimpleSingeFieldIterable(I searchPosition, Integer count,
BiFunction<I, Integer, List<R>> searchDAO, Function<R, I> model2IdFunction) {
this.searchPosition = searchPosition;
this.count = count;
this.searchDAO = searchDAO;
this.model2IdFunction = model2IdFunction;
}
@Override
public Iterator<List<R>> iterator() {
return new SingeFieldIterator();
}
class SingeFieldIterator extends AbstractIterator<List<R>> {
private boolean needContinue = true;
@Override
protected List<R> computeNext() {
if (!needContinue) {
return endOfData();
}
List<R> result = searchDAO.apply(searchPosition, count);
if (CollectionUtils.isEmpty(result)) {
needContinue = false;
return endOfData();
}
if (result.size() < count) {
needContinue = false;
}
searchPosition = model2IdFunction.apply(result.get(result.size() - 1));
return CollectionUtils.isNotEmpty(result) ? result : endOfData();
}
}
}
使用示例:
// 每次查询出来 200 条 DocLibrary
SimpleSingeFieldIterable<Long, DocLibrary> docIterable =
new SimpleSingeFieldIterable<>(0L, 200,
(minId, count) -> baseMapper.getAllList(minId, count), DocLibrary::getId);
AtomicInteger counter = new AtomicInteger(0);
docIterable.forEach(list -> {
LOGGER.info("start batchUpdate! count : {}", counter.get());
List<DocLibrary> effectiveList = list.stream()
.filter(x -> StringUtils.isEmpty(x.getFileType())).collect(Collectors.toList());
for (DocLibrary doc : effectiveList) {
String fileName = doc.getFileName();
String suffix = StringUtils.substringAfterLast(fileName, ".");
doc.setFileType(suffix);
}
baseMapper.batchUpdate(list);
LOGGER.info("start batchUpdate! count : {}", counter.addAndGet(list.size()));
});
做好 SQL 监控
对于大型 C 端项目,数据量可能很大,一定要做好 SQL 监控,包括 SQL 调用量、QPS、耗时等。可以使用 MyBatis 的拦截器来实现:
@Intercepts({
@Signature(
type = Executor.class,
method = "query",
args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class}
),
@Signature(
type = Executor.class,
method = "query",
args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class, CacheKey.class, BoundSql.class}
)
})
@Component
public class SQLExecuteCostInterceptor implements Interceptor {
private static final Logger LOGGER = LoggerFactory.getLogger(SQLExecuteCostInterceptor.class);
@Autowired
private PerfHelper perfHelper;
@Override
public Object intercept(Invocation invocation) throws Exception {
Object[] args = invocation.getArgs();
MappedStatement statement = (MappedStatement) args[0];
Object parameter = args[1];
BoundSql boundSql;
if (args.length == 4) {
boundSql = statement.getBoundSql(parameter);
} else {
boundSql = (BoundSql) args[5];
}
StopWatch stopWatch = StopWatch.createStarted();
Object result = invocation.proceed();
// 监控打点
perfHelper.perf("database-name", boundSql.getSql())
.micros(stopWatch.getNanoTime())
.logstash();
LOGGER.info("执行 {}, cost : {}", boundSql.getSql(), stopWatch.getNanoTime());
return result;
}
}
缓存
使用 Redis 加快访问
大厂里肯定没问题,小厂里很多项目几乎不使用 Redis,全是直接读数据库。注意缓存一致性即可,大部分场景使用 Cache Aside 策略即可。
Redis 大 Key 问题
- 大的字符串:比如 1M。
- 集合数据太多:比如 5000 个。
这些都是大 Key 问题,Redis 处理仍然是单线程,大 Key 会拖慢整个 Redis,并且影响带宽。常用解决方案是拆分成多个小 Key,放到多个节点即可。
Redis 热 Key 问题
如果某个 Redis 的 Key 访问量很高,那么这个 Key 就是热 Key,比如我们可以认为该 Key 的 QPS 达到 5000(看业务)那么就是热 Key,解决方案:
- 使用本地缓存:可以是 Guava 的 Cache 也可以是快手开源的全局本地缓存。
- 使用 Memcached:Memcached 抗热点能力比 Redis 好很多。
- 冗余写,随机读:写的时候写多个副本到不同的 Redis 节点,读的时候选其中一个节点读。
缓存击穿
缓存击穿一般发生在缓存过期后大量请求落到数据库,这个缓存不一定是 Redis,也可以是本地缓存。
- 如果是本地缓存:一般使用 Guava 的 Cache,使用
load miss
方法即可,也就是cache.get(key, Callable)
,内部会加锁。 - 如果是 Redis:
- 分布式锁:最容易想到的解决方案。
- CacheSetter 服务:保证相同的 Key 落到同一台服务器,然后使用 JVM 级别的锁去加锁处理。
- 逻辑过期字段:如果能接受一定的脏数据,可以设置缓存永不过期,但要设置一个逻辑过期字段,如果过期了异步加载即可。
String get(String key) {
V v = redis.get(key);
if (v.getLogicTime() < System.currentTimeMillis()) {
String mutexKey = buildMutexKey(key);
if (redis.set(mutexKey, "1", "ex", 100, "nx")) {
executor.execute(() -> { // 重建缓存 });
}
}
return v.getValue();
}
Memcached 双活架构
Memcached 不支持持久化和数据迁移,为了保证可用性,可以使用双活架构,每次写的时候两个 Memcached 集群都写,读的时候读一个,当读取失败的时候再读另外一个并且写入当前的那个。
集群隔离
为了保证业务之间不互相影响,最好做集群隔离,比如任务系统使用 task 集群,奖励系统使用 reward 集群。
多级缓存
如果并发超高,可以考虑使用多级缓存,比如对于活动系统,在后台创建活动然后下发给用户,活动相关的基础信息是不变的,完全可以使用本地缓存 + Redis + MySQL,设置可以加一层全局本地缓存(快手开源)。
消息队列与流量聚合
MQ
MQ(消息队列)主要是用来做异步削峰的,比如直播间点赞、收礼等,如果服务端收到请求后直接同步操作数据库,那么晚高峰的时候对数据库就是灾难。所以必须使用消息队列来处理。包括秒杀、快手抖音发作品等也是类似。
我们需要关注的是消费速度和性能之间的平衡,如果消费过快,那么下游扛不住,如果消费过慢又会导致消息堆积,消费到冷数据并影响业务。
拿 RocketMQ 来举例,消费者从 Broker 拉取的消息是极快的,一般这里不会成为性能瓶颈,往往成为性能瓶颈的是业务的 IO 操作。所以发现消息堆积了的话,先从 IO 那块想办法解决。比如是不是流量增大了导致消费跟不上,这时候可以考虑在不影响性能的情况下调整消费线程数或者消费者扩容。或者是不是一些 IO 接口出问题了。
流量聚合
如果流量超高,并且能接受一定延迟(超高流量大多能接受一定延迟),这时候我们可以考虑使用流量聚合策略,也就是收到消息后存起来,每隔一段时间,或者存的量达到一定阈值的时候再一次性消费。
比如我要统计每天消耗大模型的 token 数,当每次有相关接口调用的时候就将消耗的 token 数发到消息队列,然后在消费者侧进行流量聚合。
再比如,我们使用 Binlog 监听工具用户任务表(分库分表),每次产生用户任务信息我们就往 ES 写,然后在后台我们去 ES 搜索,我们也可以每次监听到消息后进行聚合,最后批量写到 ES。
关于流量聚合工具,可以参考下面的示例:
public class BufferTrigger<E> {
private static final Logger logger = getLogger(BufferTrigger.class);
private final BlockingQueue<E> queue;
private final int batchSize;
private final long lingerMs;
private final ThrowableConsumer<List<E>, Exception> consumer;
private final ScheduledExecutorService scheduledExecutorService;
private final ReentrantLock lock = new ReentrantLock();
private final AtomicBoolean running = new AtomicBoolean();
private BufferTrigger(long lingerMs, int batchSize, int bufferSize,
ThrowableConsumer<List<E>, Exception> consumer) {
this.lingerMs = lingerMs;
this.batchSize = batchSize;
this.queue = new LinkedBlockingQueue<>(max(bufferSize, batchSize));
this.consumer = consumer;
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
// 执行一次调度
this.scheduledExecutorService.schedule(new BatchConsumerRunnable(), this.lingerMs,
MILLISECONDS);
}
public void enqueue(E element) {
try {
queue.put(element);
tryTrigBatchConsume();
} catch (InterruptedException e) {
currentThread().interrupt();
}
}
private void tryTrigBatchConsume() {
if (queue.size() >= batchSize) {
if (lock.tryLock()) {
try {
if (queue.size() >= batchSize) {
if (!running.get()) { // prevent repeat enqueue
this.scheduledExecutorService.execute(this::doBatchConsumer);
running.set(true);
}
}
} finally {
lock.unlock();
}
}
}
}
public void manuallyDoTrigger() {
doBatchConsumer();
}
private void doBatchConsumer() {
lock.lock();
try {
running.set(true);
while (!queue.isEmpty()) {
List<E> toConsumeData = new ArrayList<>(min(batchSize, queue.size()));
queue.drainTo(toConsumeData, batchSize);
if (!toConsumeData.isEmpty()) {
doConsume(toConsumeData);
}
}
} finally {
running.set(false);
lock.unlock();
}
}
private void doConsume(List<E> toConsumeData) {
try {
consumer.accept(toConsumeData);
} catch (Throwable e) {
logger.error("doConsume failed", e);
}
}
private class BatchConsumerRunnable implements Runnable {
@Override
public void run() {
try {
doBatchConsumer();
} finally {
scheduledExecutorService.schedule(this, lingerMs, MILLISECONDS);
}
}
}
public long getPendingChanges() {
return queue.size();
}
public static <E> BufferTriggerBuilder<E> newBuilder() {
return new BufferTriggerBuilder<>();
}
public static class BufferTriggerBuilder<E> {
private int batchSize;
private int bufferSize;
private Duration duration;
private ThrowableConsumer<List<E>, Exception> consumer;
public BufferTriggerBuilder<E> batchSize(int batchSize) {
this.batchSize = batchSize;
return this;
}
public BufferTriggerBuilder<E> bufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}
public BufferTriggerBuilder<E> duration(Duration duration) {
this.duration = duration;
return this;
}
public BufferTriggerBuilder<E> consumer(ThrowableConsumer<List<E>, Exception> consumer) {
this.consumer = consumer;
return this;
}
public BufferTrigger<E> build() {
Preconditions.checkArgument(batchSize > 0, "batchSize 必须大于0");
Preconditions.checkArgument(bufferSize > 0, "bufferSize 必须大于0");
Preconditions.checkNotNull(duration, "duration未设置");
Preconditions.checkNotNull(consumer, "消费函数未设置");
return new BufferTrigger<>(duration.toMillis(), batchSize, bufferSize, consumer);
}
}
}
测试示例:
public class BufferTriggerDemo {
private static AtomicInteger counter = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
BufferTrigger<Long> trigger = BufferTrigger.<Long>newBuilder()
.batchSize(10)
.bufferSize(100)
.duration(Duration.ofSeconds(5))
.consumer(BufferTriggerDemo::consume)
.build();
for (long i = 0; i < 10000; i++) {
trigger.enqueue(i);
TimeUnit.MILLISECONDS.sleep(100);
}
}
private static void consume(List<Long> list) {
System.out.printf("次数 %d, result : %s \n", counter.incrementAndGet(), list);
}
}
线程池
线程池作为 JDK 的一个重要组件,同时也是性能优化的常客,不得不谈。如果异步,尤其在大模型的场景下,基本上是必不可少的。比如你要请求的两个接口没有关联性,可以考虑使用线程池去并发请求。
对于非核心逻辑,我们也可以使用线程池处理,比如用户完成任务后要给他发个触达(短信、私信等),可以使用线程池去异步处理,当然对可靠性要求高的话我们就使用 MQ。
当然,存在明显的问题就是服务重启就没了,所以对性能要求高的服务还是得使用 MQ。
服务端大多是 IO 型操作,所以可以将线程数调大一点,起始值可以设置为核数 * 2。要设置一个合理的值,可以加上监控,尤其要监控活跃线程数和堆积数,从而来调整一个合适的值,并且帮助快速发现问题。
之前出现一个问题可以作为借鉴,线上大模型相关接口经常卡住,查了半天猜测是线程池里面的任务处理耗时太长,甚至卡住了,导致新的请求进了队列并一直得不到消费,为了验证这个问题,加上了线程池监控,发现确实和猜测的吻合。
如果你使用 RPC,比如 Thrift,那么一定要使用自定义的线程池,并且监控。之前在小爱的时候就出现 RPC 接口在早高峰耗时上涨,究其原因是因为流量上涨,导致线程池处理不过来,跟上面说的问题很类似。
此外,建议给线程池命名,这样在发生问题使用 jstack
来 dump 线程堆栈的时候好分析问题。
[root@turbodesk-api-canary-c9b685f9-ctclm home]# jstack 1 > thread.log
[root@turbodesk-api-canary-c9b685f9-ctclm home]# ll
total 752
-rw-r--r-- 1 root root 769667 Jan 17 17:47 thread.log
[root@turbodesk-api-canary-c9b685f9-ctclm home]# grep java.lang.Thread.State thread.log | awk '{print $2$3$4$5}' | sort | uniq -c
69 RUNNABLE
3 TIMED_WAITING(onobjectmonitor)
8 TIMED_WAITING(parking)
6 TIMED_WAITING(sleeping)
2 WAITING(onobjectmonitor)
706 WAITING(parking)
如果你的接口是间歇性的有大量请求,可以将核心线程数调小一些,避免白创建太多线程处于 waiting 状态。系统能创建的线程数是有限的,如何计算我就不谈了,比如我们的系统最多能创建 1 万个线程,之前有大神的接口,在每次请求的时候创建了只有一个线程的线程池,导致服务经常重启。
总结
本文介绍了多种高并发性能优化技巧,包括 MySQL 的索引和分库分表、缓存的使用、消息队列与流量聚合、线程池的优化等。这些技巧在实际开发中非常有用,可以帮助我们更好地应对高并发场景,提升系统的性能和稳定性。