异步设计

什么是异步

我的理解,异步的概念是相对同步而言的,同步和异步在并发编程上也有对应的概念,对应的是串行和并行,并发编程上考虑的更微观是在线程上考虑,设计范围局限在语言层面。更宏观点的概念,同步是实时响应的过程,而异步则非实时。

异步设计

异步设计包括有两方面的内容

系统级别的异步通讯

优点:系统之间解耦解决吞吐量问题、支持一对多通讯(同步是一对一)、避免故障的连锁效应。

  • 请求响应方式
    发送方请求接收方,接收方收到后直接返回[收到,正在处理中]
    发送方怎么获得结果?
    1、 发送方发起轮询或事件触发,查询接收方是否处理完成;
    2、 发送方给接收方注册一个事件,接收方处理完成后回调注册事件。

    1
    2
    之前做过一个支付的设计,支付模块是外部系统,APP支付会跳到支付页面,其中需要带上通知
    支付结果的接口,支付完成后回到原来页面,支付模块调用参数的接口回传支付状态给后台。
  • 订阅方式
    接收方订阅发送方消息,发送方通过队列发送消息,接收方从队列获取消息,接收方获取消息后进行处理,完成后通知发送方成功处理;

  • broker方式
    订阅方式已经可以解决异步通讯问题,但依然有耦合存在,无法做到通用;在这个基础上出现了broker中间件,类似一个数据总线的东西,接收方和发送方只依赖总线,broker独立可以做到高可用性、易于水平扩展、持久化、权限配置、处理性能的监控;
    我最熟悉的broker中间件:kafka
    broker
    这种机制的缺点:
    1、问题定位需要发送方和接收方同时参与才能定位,增加了沟通成本;
    2、发送方回传结果变成非实时,消息量太大时候会造成回传延迟严重;
    3、消息乱序问题,在一些有状态的消息处理中会存在问题,需要引入新的控制乱序逻辑;

TODO: OMS 之前订单状态回传怎么解决乱序问题的?

扩展

broker方式可以解决响应速度问题,例如下单请求过来,做完必要检查后返回下单成功,然后将订单消息通过kafka
下发给结算、支付和调度模块,这是对请求进行削峰处理
在实际开发中遇到过一种需要进行削峰的场景,将历史已完成的订单数据,导入到系统中,并发送给结算模块进行结算
,发送结算模块使用kafka,导入时刻一瞬间几万个消息发送,导致下游处理性能瓶颈同时影响其他功能,后来采用先
入库,设定一个时间范围,将所有记录随机散落到这个时间段内,定时任务加载到点的数据进行下发kafka从而达到削
峰的作用

任务级别的异步处理

Push/Pull 模型

push: 通过任务调度调度分配任务给节点,需要知道节点的信息,以及节点的健康状态,复杂度高;
pull: 节点主动获取任务进行处理,

分布式事务

  • 异步处理出现a和b两个阶段,需要保存一个凭证,保证b能处理到;
  • 凭证处理的幂等性问题,不然在重试时就会出现多次操作的情况;
  • 如果事务完成不了,需要做补偿事务处理。

入库+定时任务的异步设计

以往最常见的异步方式,将数据放入数据库,然后启动定时任务扫表进行处理,这里需要解决节点扫表并发问题

模值处理

通过模值将业务数据切分到各个节点,节点定时器运行只归属于自己要跑的数据,数据是有规律的,如果是无规律的会导致分布不均而负载不均衡,导致某个节点压力很大,某个节点资源空闲浪费;TODO 模式的处理方法(例如单号怎么进行模值)

控制表加锁

刚毕业时候做过这样的逻辑,添加一个控制表,加一个标识列,节点任务定时运行时候,update这个标识,如果返回大于0则获得锁,再进行数据表的任务处理,处理完将标识位恢复;如果update失败,则等待下一个循环时间再尝试;这种处理利用了update的排它锁 (TODO 数据库的锁问题)。

Redis延迟队列设计

解决场景

  • 1、异步方式解决吞吐量问题,把请求都接收下来,然后再做处理
  • 2、定时检测超时的场景(例如未支付超时进行状态回滚)

将延迟时间转换为redis有序集合score

{id, delayTime/delayDate} –> [id, score]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public void addDelayTask(String id, double delayTime) {
if (delayTime <= 0) {
executeTask(id);
} else {
double score = System.currentTimeMillis() + delayTime * 1000;
cache.zadd(keyName, id, score);
}
}

@Override
public void addExpireTask(String id, Date delayDate) {
double score = delayDate.getTime();
cache.zadd(keyName, taskId, score);
}

轮询线程从队列中取任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
public void run() {
while (!isInterrupted()) {
try {
waitTime(taskInterval); // 等待防止空循环耗资源
// lock 进行多节点并发控制
RedisLockUtils.lock(cache, queueName, lockTimeout, new Runnable() {
@Override
public void run() {
// 取到点任务
Set<String> ids = cache.zrangeByScore(keyName, 0, System.currentTimeMillis());
if (ids != null) {
ids.forEach(id -> {
// 具体执行任务可以丢到一个线程池中运行
executeTask(id);
});
}
}
});
} catch (Exception e) {
logger.error("delay task [" + queueName + "] error.", e);
}
}
}

分布式锁控制并发

1、通过setnx设置标识,没有成功则等待
2、获取到标识,则执行任务
3、这个设计有个问题:id成cache中移除是在线程池中的,如果任务太慢会导致来不及删除就unlock,会出现多次执行的问题(FIXME 改进方案?)
4、TODO 分布式锁分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* 在redis中获取一个锁,并执行一个任务,最后释放锁
*
* @param cache redisCache
* @param lockName 锁名称
* @param timeout 锁过期时间,秒
* @param task 任务
*/
public static void lock(ICache cache, String lockName, int timeout, Runnable task) {
while (!lock(cache, lockName, timeout)) {
sleep(500);
}

try {
task.run();
} finally {
unlock(cache, lockName);
}
}
/**
* 在redis中获取一个锁
*
* @param cache redisCache
* @param lockName 锁名称
* @param timeout 锁过期时间,秒
* @return
*/
public static boolean lock(ICache cache, String lockName, int timeout) {
try {
String key = LOCK_PREFIX + lockName;
boolean flag = cache.setnx(key, LOCK_VALUE, timeout);
if (!flag && (cache.ttl(key) == -1)) {
cache.setTtl(key, timeout);
}
return flag;
} catch (Exception e) {
logger.warn("get lock [" + lockName + "] error.", e);
return false;
}
}
/**
* 释放锁
*
* @param cache
* @param lockName
*/
public static void unlock(ICache cache, String lockName) {
try {
String key = LOCK_PREFIX + lockName;
cache.remove(key);
} catch (Exception e) {
logger.warn("release lock [" + lockName + "] error", e);
}
}

长时间任务的异步队列

背景

数据导出是一个很常见的场景,为了避免用户等待,需要异步进行数据导出,导出后发送给用户或用户在页面上下载;

需要解决的问题点

1、异步逻辑: 请求信息入库,id放入队列排队,节点从队列中获取任务;
2、节点并发: 节点从队列中获取任务然后将任务移除队列,这个操作需要同步,即事务处理,类似加锁
3、节点负载均衡: 设定每个节点同时可容纳的任务个数,达到上限后停止从队列中取任务

Redis的list

我们要实现的其实就是一个先进先出的队列,Redis的list结构正好满足,推入和弹出命令等同于放入和取出任务,这两个命令都属于原子性操作,满足同步的需求,也可以使用阻塞式的命令来等待元素,我这里只用到了lpop和rpush命令。

实现代码

  • 节点轮询获取任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public void run() {
try {
// 循环等待获取队列元素
while (!isInterrupted()) {
if (checkEnoughCapacity()) {
String element = pop(); // cache.pop(queueName); 需要异常捕获
if (StringUtils.isNotBlank(element)) {
runTask(element);
} else {
waitTime(WAIT_INTERVAL); // 未取出元素 等待WAIT_INTERVAL后重试
}
} else {
waitTime(WAIT_INTERVAL); // 阈值已满 等待WAIT_INTERVAL后重试
}
}
} catch (Exception e) {
logger.error("async task [" + queueName + "] error.", e);
}
}
  • 检查当前运行任务数是否未到达阈值
1
2
3
4
5
6
7
8
9
/**
*
* @return
*/
public boolean checkEnoughCapacity() {
ThreadPool pool = ThreadPoolManager.getSingleton().getThreadPool();
int currentTaskNum = pool.getSubmittedTaskCount(threadPoolName);
return currentTaskNum < threshold;
}
  • 丢入线程池中运行任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 运行任务
*
* @param json
*/
public void runTask(String json) {
logger.info("取出队列元素{}", json);
ThreadPool pools = ThreadPoolManager.getSingleton().getThreadPool();
pools.submit(new Runnable() {
public void run() {
try {
QueueMember memeber = JsonUtils.json2Object(json, QueueMember.class);
handler.runTask(memeber);
} catch (Exception e) {
logger.error("运行任务失败:{}", e);
}
}
}, threadPoolName);
}

ThreadPoolManager线程池模块GitHub上有一个实现,做到了管理和监控线程池状态,我修改添加了一个线程池内线程运行任务的的计数器,见GitHub

优先级队列

异步设计会带来延迟问题,请求和数据越多延迟越大,我们希望对于某些紧急重要的任务优先处理,所以我们需要设计优先级队列。

思路演进 (FIXME 添加图解)

  • 设置优先级,或动态调整队列中的元素顺序

    思路1:Redis有序集合,根据某种优先级算法计算score,score大小影响元素的顺序,达到score大的元素优先处理掉的目的;
    思路2:多个List(还是redis),例如根据优先级划分为高中低三个list, 先处理完高优先级的list,再处理低优先级的list;

  • 上面两个方案都有可能会导致低优先级的元素饿死,迟迟无法处理

    思路2的改进:对高中低三个级别进行时间片的分配,例如对高级别list处理3个,中级别list处理2个,低级别list处理1个,循环这个处理规则;

  • 优先级层次太多,例如有1~100,list太多了?

    结合上面思路的改进:将100个等级划分为几段,例如高分段(1~30),中分段(31~60),低分段(61~100),每个分段用一个有序集合表示,有序集合内部的score代表一个分段内的排序规则,从而将问题简化为上面的高中低时间片规则。

实现代码 (FIXME)

Java8异步事件驱动编程 FIXME

eventbus的异步总线队列 FIXME

关于[有赞的延迟队列设计]

对“有赞延迟队列设计”的学习和实现 FIXME
原文链接