RxJs下的多请求异步编程指南

背景

在工作中,遇到这样一个场景,需要发起多个异步请求,这些请求中,可能串联发起,也可能并行发起,有时还需要根据上一个请求的返回决定是否发起下一个请求。目前的处理方式是依据async/await关联起整个业务逻辑,代码写的并不优雅,并且易读性也不好。本文即在此背景下做出的关于此问题的优化尝试。

RxJS简介

根据官方文档介绍,RxJS是一种基于可观测数据流Stream结合观察者模式和迭代器模式的异步编程范式,它的出现使编写异步或基于回调的代码更容易,因此也是优化本文问题的一个选择。此外,任何东西都可以是一个Stream:变量、用户输入、属性、Cache、数据结构等,可以对此监听并作出响应,在此基础上,还可以通过combine、filter、map等操作符进行各种操作。

本文不对RxJS本身做更多的介绍,如需了解RxJS的更多内容,请参考官方文档:https://rxjs.dev/。

准备工作

在介绍RxJS如何进行串联或并联请求之前,先做一些准备工作。模拟用户是否开通请求、是否可以开通等。

模拟用户是否开通

const openConsultPromise = (ms) => {
  return new Promise(resolve => {
    setTimeout(() => resolve({
      open: false,
    }), ms);
  });
};

通过setTimeout()模拟请求时长,返回一个promise实例。

模拟用户是否可以开通

const admitPromise = (ms) => {
  return new Promise(resolve => {
    setTimeout(() => resolve({
      admit: true,
    }), ms);
  });
};

利用RxJS处理Promise

注:本文示例基于 rxjs 6.2.0版本。

在RxJS中,所有的数据都可以转换为 Observable 对象。Promise 可以通过 from() 函数进行转换,比如上述的用户是否开通请求,转换为 Observable 对象的方式如下:

const { from } = Rx;
from(openConsultPromise(200))

Observable 对象都可以订阅,订阅方式如下:

const { from } = Rx;
from(openConsultPromise(200)).subscribe(openConsultResult => console.log(openConsultResult));
// 输出如下:
// {
//  "open": false
// }

串联发起多个请求

发起多个请求,请参考如下代码,说明请参考注释。

const { from, operators } = Rx;
const { map, mergeMap } = operators;
from(openConsultPromise(200))
  // map 操作符,用于做数据处理
  .pipe(map(openConsultResult => openConsultResult.open))
  // 通过 mergeMap 可以关联起多个请求
  .pipe(mergeMap((open) => from(admitPromise(200)).pipe(map((admitResult) => {
    return {
      open,
      admit: admitResult.admit,
    };
  }))))
  .subscribe(result => console.log(result));

我们面临的需求是:用户未开通的请求下,再查询用户是否可以开通。这时,可以利用 filter() 操作符过滤。

from(openConsultPromise(200))
  .pipe(
    map(openConsultResult => openConsultResult.open),
    filter(open => !open),
  )
  .pipe(
    mergeMap((open) => from(admitPromise(200)).pipe(map((admitResult) => {
      return {
        open,
        admit: admitResult.admit,
      };
    }))),
  )
  .subscribe(result => console.log(result));

也可以利用iif操作符根据不同的条件做不同的处理。

from(openConsultPromise(200))
  .pipe(map(openConsultResult => openConsultResult.open))
  .pipe(mergeMap(open => iif(
    () => open,
    of({ open }),
    from(admitPromise(200)).pipe(map((admitResult) => {
      return {
        open,
        admit: admitResult.admit,
      };
    })
  ))))
  .subscribe(result => {
    console.log('result: ', result);
  });

并行发起多个请求

并行发起多个请求,利用 forkJoin() 即可。

const { from } = Rx;
forkJoin([openConsultPromise(10), admitPromise(1000)]).subscribe(x => console.log(x));

总结

本文是对RxJS如何进行异步请求编程的简单总结,更多使用请参考RxJS官方文档。学习RxJS的过程中,其中最为关键的是理解RxJS的基本概念。

参考文档

• RxJS官方文档:https://rxjs.dev/
• RP入门:https://github.com/benjycui/introrx-chinese-edition
• 响应式编程入门指南:https://hijiangtao.github.io/2020/01/13/RxJS-Introduction-and-Actions/