RxJs下的多请求异步编程指南
背景
在工作中,遇到这样一个场景,需要发起多个异步请求,这些请求中,可能串联发起,也可能并行发起,有时还需要根据上一个请求的返回决定是否发起下一个请求。目前的处理方式是依据async/await
关联起整个业务逻辑,代码写的并不优雅,并且易读性也不好。本文即在此背景下做出的关于此问题的优化尝试。
RxJS简介
根据官方文档介绍,RxJS是一种基于可观测数据流Stream结合观察者模式和迭代器模式的异步编程范式,它的出现使编写异步或基于回调的代码更容易,因此也是优化本文问题的一个选择。此外,任何东西都可以是一个Stream:变量、用户输入、属性、Cache、数据结构等,可以对此监听并作出响应,在此基础上,还可以通过combine、filter、map等操作符进行各种操作。
本文不对RxJS本身做更多的介绍,如需了解RxJS的更多内容,请参考官方文档:https://rxjs.dev/。
准备工作
在介绍RxJS如何进行串联或并联请求之前,先做一些准备工作。模拟用户是否开通请求、是否可以开通等。
模拟用户是否开通
const openConsultPromise = (ms) => {
return new Promise(resolve => {
setTimeout(() => resolve({
open: false,
}), ms);
});
};
通过setTimeout()
模拟请求时长,返回一个promise
实例。
模拟用户是否可以开通
const admitPromise = (ms) => {
return new Promise(resolve => {
setTimeout(() => resolve({
admit: true,
}), ms);
});
};
利用RxJS处理Promise
注:本文示例基于 rxjs 6.2.0版本。
在RxJS中,所有的数据都可以转换为 Observable 对象。Promise 可以通过 from()
函数进行转换,比如上述的用户是否开通请求,转换为 Observable 对象的方式如下:
const { from } = Rx;
from(openConsultPromise(200))
Observable 对象都可以订阅,订阅方式如下:
const { from } = Rx;
from(openConsultPromise(200)).subscribe(openConsultResult => console.log(openConsultResult));
// 输出如下:
// {
// "open": false
// }
串联发起多个请求
发起多个请求,请参考如下代码,说明请参考注释。
const { from, operators } = Rx;
const { map, mergeMap } = operators;
from(openConsultPromise(200))
// map 操作符,用于做数据处理
.pipe(map(openConsultResult => openConsultResult.open))
// 通过 mergeMap 可以关联起多个请求
.pipe(mergeMap((open) => from(admitPromise(200)).pipe(map((admitResult) => {
return {
open,
admit: admitResult.admit,
};
}))))
.subscribe(result => console.log(result));
我们面临的需求是:用户未开通的请求下,再查询用户是否可以开通。这时,可以利用 filter()
操作符过滤。
from(openConsultPromise(200))
.pipe(
map(openConsultResult => openConsultResult.open),
filter(open => !open),
)
.pipe(
mergeMap((open) => from(admitPromise(200)).pipe(map((admitResult) => {
return {
open,
admit: admitResult.admit,
};
}))),
)
.subscribe(result => console.log(result));
也可以利用iif
操作符根据不同的条件做不同的处理。
from(openConsultPromise(200))
.pipe(map(openConsultResult => openConsultResult.open))
.pipe(mergeMap(open => iif(
() => open,
of({ open }),
from(admitPromise(200)).pipe(map((admitResult) => {
return {
open,
admit: admitResult.admit,
};
})
))))
.subscribe(result => {
console.log('result: ', result);
});
并行发起多个请求
并行发起多个请求,利用 forkJoin()
即可。
const { from } = Rx;
forkJoin([openConsultPromise(10), admitPromise(1000)]).subscribe(x => console.log(x));
总结
本文是对RxJS如何进行异步请求编程的简单总结,更多使用请参考RxJS官方文档。学习RxJS的过程中,其中最为关键的是理解RxJS的基本概念。
参考文档
• RxJS官方文档:https://rxjs.dev/
• RP入门:https://github.com/benjycui/introrx-chinese-edition
• 响应式编程入门指南:https://hijiangtao.github.io/2020/01/13/RxJS-Introduction-and-Actions/