什么是异步
我的理解,异步的概念是相对同步而言的,同步和异步在并发编程上也有对应的概念,对应的是串行和并行,并发编程上考虑的更微观是在线程上考虑,设计范围局限在语言层面。更宏观点的概念,同步是实时响应的过程,而异步则非实时。
异步设计
异步设计包括有两方面的内容
系统级别的异步通讯
优点:系统之间解耦解决吞吐量问题、支持一对多通讯(同步是一对一)、避免故障的连锁效应。
请求响应方式
发送方请求接收方,接收方收到后直接返回[收到,正在处理中]
发送方怎么获得结果?
1、 发送方发起轮询或事件触发,查询接收方是否处理完成;
2、 发送方给接收方注册一个事件,接收方处理完成后回调注册事件。1
2之前做过一个支付的设计,支付模块是外部系统,APP支付会跳到支付页面,其中需要带上通知
支付结果的接口,支付完成后回到原来页面,支付模块调用参数的接口回传支付状态给后台。订阅方式
接收方订阅发送方消息,发送方通过队列发送消息,接收方从队列获取消息,接收方获取消息后进行处理,完成后通知发送方成功处理;- broker方式
订阅方式已经可以解决异步通讯问题,但依然有耦合存在,无法做到通用;在这个基础上出现了broker中间件,类似一个数据总线的东西,接收方和发送方只依赖总线,broker独立可以做到高可用性、易于水平扩展、持久化、权限配置、处理性能的监控;
我最熟悉的broker中间件:kafka
这种机制的缺点:
1、问题定位需要发送方和接收方同时参与才能定位,增加了沟通成本;
2、发送方回传结果变成非实时,消息量太大时候会造成回传延迟严重;
3、消息乱序问题,在一些有状态的消息处理中会存在问题,需要引入新的控制乱序逻辑;
TODO: OMS 之前订单状态回传怎么解决乱序问题的?
扩展
broker方式可以解决响应速度问题,例如下单请求过来,做完必要检查后返回下单成功,然后将订单消息通过kafka
下发给结算、支付和调度模块,这是对请求进行削峰处理
在实际开发中遇到过一种需要进行削峰的场景,将历史已完成的订单数据,导入到系统中,并发送给结算模块进行结算
,发送结算模块使用kafka,导入时刻一瞬间几万个消息发送,导致下游处理性能瓶颈同时影响其他功能,后来采用先
入库,设定一个时间范围,将所有记录随机散落到这个时间段内,定时任务加载到点的数据进行下发kafka从而达到削
峰的作用
任务级别的异步处理
Push/Pull 模型
push: 通过任务调度调度分配任务给节点,需要知道节点的信息,以及节点的健康状态,复杂度高;
pull: 节点主动获取任务进行处理,
分布式事务
- 异步处理出现a和b两个阶段,需要保存一个凭证,保证b能处理到;
- 凭证处理的幂等性问题,不然在重试时就会出现多次操作的情况;
- 如果事务完成不了,需要做补偿事务处理。
入库+定时任务的异步设计
以往最常见的异步方式,将数据放入数据库,然后启动定时任务扫表进行处理,这里需要解决节点扫表并发问题
模值处理
通过模值将业务数据切分到各个节点,节点定时器运行只归属于自己要跑的数据,数据是有规律的,如果是无规律的会导致分布不均而负载不均衡,导致某个节点压力很大,某个节点资源空闲浪费;TODO 模式的处理方法(例如单号怎么进行模值)
控制表加锁
刚毕业时候做过这样的逻辑,添加一个控制表,加一个标识列,节点任务定时运行时候,update这个标识,如果返回大于0则获得锁,再进行数据表的任务处理,处理完将标识位恢复;如果update失败,则等待下一个循环时间再尝试;这种处理利用了update的排它锁 (TODO 数据库的锁问题)。
Redis延迟队列设计
解决场景
- 1、异步方式解决吞吐量问题,把请求都接收下来,然后再做处理
- 2、定时检测超时的场景(例如未支付超时进行状态回滚)
将延迟时间转换为redis有序集合score
{id, delayTime/delayDate} –> [id, score]
1 |
|
轮询线程从队列中取任务
1 |
|
分布式锁控制并发
1、通过setnx设置标识,没有成功则等待
2、获取到标识,则执行任务
3、这个设计有个问题:id成cache中移除是在线程池中的,如果任务太慢会导致来不及删除就unlock,会出现多次执行的问题(FIXME 改进方案?)
4、TODO 分布式锁分析1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54/**
* 在redis中获取一个锁,并执行一个任务,最后释放锁
*
* @param cache redisCache
* @param lockName 锁名称
* @param timeout 锁过期时间,秒
* @param task 任务
*/
public static void lock(ICache cache, String lockName, int timeout, Runnable task) {
while (!lock(cache, lockName, timeout)) {
sleep(500);
}
try {
task.run();
} finally {
unlock(cache, lockName);
}
}
/**
* 在redis中获取一个锁
*
* @param cache redisCache
* @param lockName 锁名称
* @param timeout 锁过期时间,秒
* @return
*/
public static boolean lock(ICache cache, String lockName, int timeout) {
try {
String key = LOCK_PREFIX + lockName;
boolean flag = cache.setnx(key, LOCK_VALUE, timeout);
if (!flag && (cache.ttl(key) == -1)) {
cache.setTtl(key, timeout);
}
return flag;
} catch (Exception e) {
logger.warn("get lock [" + lockName + "] error.", e);
return false;
}
}
/**
* 释放锁
*
* @param cache
* @param lockName
*/
public static void unlock(ICache cache, String lockName) {
try {
String key = LOCK_PREFIX + lockName;
cache.remove(key);
} catch (Exception e) {
logger.warn("release lock [" + lockName + "] error", e);
}
}
长时间任务的异步队列
背景
数据导出是一个很常见的场景,为了避免用户等待,需要异步进行数据导出,导出后发送给用户或用户在页面上下载;
需要解决的问题点
1、异步逻辑: 请求信息入库,id放入队列排队,节点从队列中获取任务;
2、节点并发: 节点从队列中获取任务然后将任务移除队列,这个操作需要同步,即事务处理,类似加锁
3、节点负载均衡: 设定每个节点同时可容纳的任务个数,达到上限后停止从队列中取任务
Redis的list
我们要实现的其实就是一个先进先出的队列,Redis的list结构正好满足,推入和弹出命令等同于放入和取出任务,这两个命令都属于原子性操作,满足同步的需求,也可以使用阻塞式的命令来等待元素,我这里只用到了lpop和rpush命令。
实现代码
- 节点轮询获取任务
1 |
|
- 检查当前运行任务数是否未到达阈值
1 | /** |
- 丢入线程池中运行任务
1 | /** |
ThreadPoolManager线程池模块GitHub上有一个实现,做到了管理和监控线程池状态,我修改添加了一个线程池内线程运行任务的的计数器,见GitHub。
优先级队列
异步设计会带来延迟问题,请求和数据越多延迟越大,我们希望对于某些紧急重要的任务优先处理,所以我们需要设计优先级队列。
思路演进 (FIXME 添加图解)
设置优先级,或动态调整队列中的元素顺序
思路1:Redis有序集合,根据某种优先级算法计算score,score大小影响元素的顺序,达到score大的元素优先处理掉的目的;
思路2:多个List(还是redis),例如根据优先级划分为高中低三个list, 先处理完高优先级的list,再处理低优先级的list;上面两个方案都有可能会导致低优先级的元素饿死,迟迟无法处理
思路2的改进:对高中低三个级别进行时间片的分配,例如对高级别list处理3个,中级别list处理2个,低级别list处理1个,循环这个处理规则;
优先级层次太多,例如有1~100,list太多了?
结合上面思路的改进:将100个等级划分为几段,例如高分段(1~30),中分段(31~60),低分段(61~100),每个分段用一个有序集合表示,有序集合内部的score代表一个分段内的排序规则,从而将问题简化为上面的高中低时间片规则。
实现代码 (FIXME)
Java8异步事件驱动编程 FIXME
eventbus的异步总线队列 FIXME
关于[有赞的延迟队列设计]
对“有赞延迟队列设计”的学习和实现 FIXME
原文链接