飞码网-免费源码博客分享网站

点击这里给我发消息

10个需要知道的RxJS函数与示例|-JavaScript教程

飞码网-免费源码博客分享网站 爱上飞码网—https://www.codefrees.com— 飞码网-matlab-python-C++ 爱上飞码网—https://www.codefrees.com— 飞码网-免费源码博客分享网站

随着对函数式反应式编程(FRP)的兴趣的增长,RxJS已经成为该范例中最流行的JavaScript库之一。在本文中,我们将看一看我认为是RxJS的十个必须知道的函数。

注意:前提是要熟悉RxJS的基础知识,如RxJS的函数式反应性编程简介中所述。

反应式编程

响应式编程是一种编程范例,将称为Observables的数据流作为其基本编程单元。

流(或RxJS术语中的Observables)类似于事件侦听器:两者都等待事件发生,并在事件发生时通知您。您从onClick侦听器获得的一系列异步通知是数据流的完美示例。

换句话说,一个Observable就是随时间填充数组

该数组的元素几乎可以来自任何地方:文件系统,DOM事件,API调用,甚至是转换后的同步数据,例如数组。从本质上讲,反应式编程无非就是将Observables用作程序的构建基块。

与数组的关系

数组很简单,因为除非明确更改,否则它们的内容是最终的。从这个意义上讲,数组本质上没有临时性。

另一方面,Observable由时间定义。您对流的了解最多的是,[1, 2, 3]到目前为止,它已经收到了。您不能确定是否会获得(4也不会获得),这是由数据源而不是程序来决定的。

流和数组之间的关系是如此深刻,以至于大多数反应式扩展都来自函数式编程领域,其中列表操作是面包和黄油。

热身RxJS

考虑所有常见的待办事项应用程序。让我们看看使用RxJS来显示仅显示用户未完成任务的名称的问题:

const task_stream =
  // Makes a stream of all the tasks in the database
  getTasks().
    // Get tasks only for this user
    filter((task) => task.user_id == user_id).
    // Get tasks that are incompleted
    filter((task) => !task.completed).
    // Only get name of task
    map((task) => task.name)

/* Tasks look like this:
   task = {
    user_id   : number,
    completed : boolean,
    name      : string
   }
 */

到目前为止,这不过是数组的额外功能,而是展示了反应式编程的功能风格。

通过添加更复杂的“真实世界”功能,它的声明性性质变得更加清晰。假设我们要:

  • 响应用户对查看已完成或未完成任务的选择,启动请求;
  • 仅每秒发送一次对最后选择的请求,以免在用户快速更改选择时浪费带宽;
  • 重试失败的请求,最多三遍;
  • 仅当服务器发送与上次不同的响应时,才重新绘制视图。
const task_stream =
  parameter_stream.
    debounce(1000).
    map((parameter) => {
      getTasks().
        retry(3).
        filter((task) => task.user_id === user_id).
        filter((task) => task.completed === parameter).
        map((task)    => task.name)
    }).
    flatMap(Rx.Observable.from).
    distinctUntilChanged().
    update()

一步步:

 

 

  • parameter_stream告诉我们用户是要完成任务还是要完成任务,将选择存储在中parameter
  • debounce() 确保我们仅关注每秒的最后一次单击按钮;
  • 周围的部分getTasks()与之前相同。
  • distinctUntilChanged()确保仅在服务器的响应与上次不同时才关注我们;
  • update() 负责更新UI以反映我们从服务器获得的内容。

以强制性的,基于回调的方式处理去抖动,重试和“直到更改之前是唯一的”逻辑是有效的,但它既脆弱又复杂。

得出的结论是,使用RxJS进行编程可以实现以下目的:

  1. 声明性程序;
  2. 可扩展的系统;
  3. 简单,强大的错误处理。

我们将通过RxJS的十个必知功能在上例中满足上述示例中的每个功能。

简单流上的操作

简单流(发出诸如字符串之类的简单值)的基本函数包括:

  • 地图()
  • 筛选()
  • 减少()
  • take()/ takeWhile()

除了take()和以外takeWhile(),它们类似于JavaScript的高阶数组函数。

 

我们将通过解决一个示例问题来应用所有这些方法:在数据库中查找所有拥有.com或.org网站的用户,并计算其网站名称的平均长度。

JSONPlaceholder将成为我们的用户来源。这是我们将使用的用户数据的JSON表示形式。

1.使用map()转换数据

使用map()上可观察到的是相同的使用它的阵列上。它:

  1. 接受回调作为参数;
  2. 在调用它的数组的每个元素上执行它;
  3. 返回一个新数组,该数组将原始数组的每个元素替换为对其调用回调的结果。

map()在Observables上使用的唯一区别是:

  1. 它没有返回新数组,而是返回了一个新的Observable。
  2. 它在Observable每次发出新项目时执行,而不是立即全部执行。

我们可以map()用来将用户数据流转换为他们的网站名称列表:

source.
  map((user) => user.website)

请参见RxJS的Pen 10功能//通过CodePen上的SitePoint(@SitePoint)进行映射。

 

在这里,我们曾经用map每个用户的网站“替换”传入流中的每个用户对象。

 

 

RxJS还允许你打电话map()select()这两个名称指的是相同的功能。

2.筛选结果

map()filter()是非常对观测作为数组一样。为了找到每个使用.net或.org网站地址的用户,我们可以这样写:

source.
  map((user) => user.website).
  filter((website) => (website.endsWith('net') || website.endsWith('org'));
})

请参阅RxJS的Pen 10函数// //在CodePen上按SitePoint(@SitePoint)进行过滤。

 

这仅选择其网站以“ net”或“ org”结尾的用户。

filter()也有别名where()

3.使用reduce()收集结果

reduce() 允许我们使用所有单独的值并将其转化为单个结果。

reduce()往往是基本列表操作最令人困惑的地方,因为与filter()不同map(),它的行为因使用而不同。

通常,reduce()将值的集合转换为单个数据点。在我们的案例中,我们将向它提供网站名称流,并用于reduce()将其转换为一个对象,该对象计算我们找到的网站数量以及名称长度的总和。

source.
  map((user) => user.website).
  filter((website) => (website.endsWith('net') || website.endsWith('org'))).
  reduce((data, website) => {
    return {
      count       : data.count += 1,
      name_length : data.name_length += website.length
    }
  }, { count : 0, name_length : 0 })

请参阅RxJS的Pen 10函数// //在CodePen上减少SitePoint(@SitePoint)。

 

在这里,我们将流简化为单个对象,该对象跟踪:

  1. 我们看过多少个网站;
  2. 他们所有名字的总长度。

请记住,reduce()仅当源Observable完成时才返回结果。如果您想在每次流接收到新项目时都知道累加器的状态,请scan()改用。

 

 

4.使用take()限制结果

take()takeWhile()完善简单流上的基本功能。

take(n)n从流中读取值,然后取消订阅。

scan()每当我们收到一个网站时,就可以使用我们发出的对象,并且只有take()前两个值。

source.
  map((user) => user.website).
  filter((website) => (website.endsWith('net') || website.endsWith('org'))).
  scan((data, website) => {
      return {
        count       : data.count += 1,
        name_length : data.name_length += website.length
      }
    }, { count : 0, name_length : 0 }).
  take(2);

请参阅RxJS的Pen 10函数// //通过CodePen上的SitePoint(@SitePoint)扫描和获取/获取。

 

RxJS还提供takeWhile(),允许您在某些布尔测试成立之前接受值。我们可以这样编写上面的流takeWhile()

source.
  map((user) => user.website).
  filter((website) => (website.endsWith('net') || website.endsWith('org'))).
  scan((data, website) => {
    return {
      count       : data.count += 1,
      name_length : data.name_length += website.length
    }
  }, { count : 0, name_length : 0 }).
  takeWhile((data) =>  data.count < 3)

高阶流上的运算

除了它们在Observables而不是数组上工作之外,这些函数与熟悉的list操作几乎相同。

“ [如果您知道如何使用Array#extras对数组进行编程,那么您已经知道如何使用RxJS!” 〜RxJS文档

正如数组可以包含比简单值(例如数组或对象)更复杂的数据一样,可观察对象也可以发出更高阶的数据,例如承诺或其他可观察对象。这是更多专门工具起作用的地方。

5.用flatMap()压缩流

事实上,我们已经在使用一个!

我们作出的呼叫fromPromise()flatMap()当我们定义了source数据流:

const source   =
        // Take a Promise and convert it to an Observable
        Rx.Observable.fromPromise(makeRequest(ENDPOINT))
          // Flatten Promise
          .flatMap(Rx.Observable.from); 

这使用了三台新机器:

  1. 来自Promise;
  2. Rx.Observable.from;
  3. flatMap。

诺言中的可观察到的东西

一个Promise代表一个单个的将来值,我们将异步获取该值,例如,调用服务器的结果。

 

 

Promise的定义特征之一是它仅代表一个未来价值。它不能返回多个异步数据。这就是Observables所做的,并且是两者之间的根本区别。

这意味着,当我们使用时Rx.Observable.fromPromise(),我们得到一个Observable,它发出一个值—

  1. Promise解决的价值;或者
  2. Promise拒绝的错误。

当Promise返回字符串或数字时,我们不需要做任何特殊的事情。但是,当返回数组时(如本例所示),我们更喜欢创建一个Observable来发出数组的内容,而不是将数组本身作为单个值发出。

6.使用flatMap()

此过程称为扁平化,它flatMap()会负责。它有很多重载,但是我们只使用最简单和最常见的重载。

使用时flatMap(),我们:

  1. 调用flatMap()发出单值解决方案或拒绝Promise的Observable;
  2. 将其传递给函数以创建新的Observable。

在我们的例子中,我们通过Rx.Observable.from(),它根据数组的值创建一个序列:

Rx.Observable.from([1, 2, 3]).
  subscribe(
      onNext (value) => console.log(`Next: ${value}`))

// Prints: 
//  Next: 1
//  Next: 2
//  Next: 3

这涵盖了我们的小序幕中的代码:

const source =
  // Create an Observable emitting the VALUE or REJECTION of a Promise...
  Rx.Observable.fromPromise(makeRequest(ENDPOINT))
    // ...And turn it into a new Observable that emits every item of the
    //  array the Promise resolves to.
    .flatMap(Rx.Observable.from)

RxJS也具有的别名flatMap()selectMany()

组成多个流

通常,我们需要将多个流放在一起。合并流的方法有很多,但有几种方法比其他方法要多。

7.将流与concat()和merge()合并

串联和合并是合并流的两种最常用的方法。

串联通过发出第一个流的值直到完成为止,然后发出第二个流的值来创建新的流。

合并通过从活动的任何流中发出值来从许多流中创建一个新的流

想像一下在Facebook Messenger上同时与两个人交谈。concat()是一种情况,您从两个人都收到消息,但是先完成与一个人的对话,然后再回复另一个人。merge()就像创建群聊并同时接收两个消息流一样。

 

 

source1.
  concat(source2).
  subscribe(
    onNext(value) => console.log(`Next: ${value}`))
    // Prints 'Source 1' values first, THEN 'Source 2'

source1.
  merge(source2).
  subscribe(
    onNext(value) => console.log(`Next: ${value}`))
    // INTERLEAVES 'Source 1' and 'Source 2' values

请参阅CodePen上的SitePoint(@SitePoint)的RxJS的Pen 10函数//合并和合并。

 

concat()流将打印所有的值从source1第一个也是唯一开始从印刷值source2source1完成。

merge()流将从打印值source1source2,因为它接收它们:从所述第二发光值之前它不会等待第一流以完整的。

8.使用switch()

通常,我们想听一个Observable发出的Observable,但是只注意从源头发出的最新消息。

为了进一步扩展Facebook Messenger的类比,您switch()就是这种情况。好了,根据当前正在发送消息的人来切换响应的人。

为此,RxJS提供了开关。

用户界面提供了几个好的用例switch()如果我们的应用程序在用户每次选择要搜索的内容时都触发请求,则可以假定他们只想查看最新选择的结果。因此,我们通常switch()只听最新选择的结果。

在进行此操作时,我们应该确保不要浪费带宽,只需要在服务器上敲击用户每秒进行的最后一次选择即可。我们为此使用的功能称为debounce()

如果您想朝另一个方向前进,并且只接受第一个选择,则可以使用油门()。它具有相同的API,但行为相反。

请参阅CodePen上的RxJS的Pen 10函数// //切换,CombineLatest,以及由SitePoint(@SitePoint)进行的distingtUntilChanged。

 

9.协调流

如果我们要允许用户搜索帖子或具有特定ID的用户怎么办?

 

 

为了演示,我们将创建另一个下拉列表,并允许用户选择他们想要检索的项目的ID。

有两种情况。当用户:

  1. 更改任一选择;或者
  2. 更改两个选择。

使用CombineLatest()响应对任一流的更改

在前一种情况下,我们需要创建一个流来触发具有以下内容的网络请求:

  1. 用户最近选择的哪个端点;
  2. 用户最近选择的ID。

并在用户更新任一选择时执行此操作。

combineLatest()是为了:

// User's selection for either POSTS or USERS data
const endpoint_stream = 
  Rx.Observable.fromEvent(select_endpoint, 'click').
    map(event  => event.target).
    map(target => (target.options[target.selectedIndex].text.toLowerCase()));

// Which item ID the user wants to retrieve
const id_stream = 
  Rx.Observable.fromEvent(select_id, 'click').
    map(event  => event.target).
    map(target => (target.options[target.selectedIndex].text));

// Emits a pair of the most recent selections from BOTH streams 
//   when EITHER emits a value
const complete_endpoint_stream = 
  endpoint_stream.combineLatest(id_stream);

请参阅RxJS的Pen 10函数//在CodePen上由SitePoint(@SitePoint)合并及最新和distinctUntilChanged。

 

每当任何一个流发出一个值,combineLatest()取走所发出的值并将其与另一个流所发出的最后一项配对时,就将其发送给数组。

这在图表中更容易可视化:

// stream1 : Emits 1
// stream2 : Emits 1

combined : Emits [1, 1]

// stream2: Emits 2

combined : Emits [1, 2]

// stream2: Emits 3

combined : Emits [1, 3]

仅使用zip响应两个流中的更改

要等到用户更新ID和终结点字段的选择,请替换combineLatest()zip()

同样,通过图表更容易理解:

// stream1 : Emits A
// stream2 : Emits 1
zipped : Emits [A, 1]

// stream2: Emits 2
zipped : Emits NOTHING

// stream2: Emits 3
zipped : Emits NOTHING

// stream1: Emits B
zipped : Emits [B, 2]

// stream1: Emits C
zipped : Emits [C, 3]

不像combineLatest(),要zip()等到两个Observable都发出新的东西之前,才发出其更新值的数组。

10. takeUntil

最后,takeUntil()允许我们侦听第一个流,直到第二个流开始发出值。

source1.
  takeUntil(source2);

当您需要协调流而不一定要合并它们时,这很有用。

总结

向数组添加时间维度的简单事实为全新的程序思考方式打开了大门。

RxJS的功能远不止我们在这里看到的,但这远远超出了我们的预期。

飞码网-免费源码博客分享网站 爱上飞码网—https://www.codefrees.com— 飞码网-matlab-python-C++ 爱上飞码网—https://www.codefrees.com— 飞码网-免费源码博客分享网站
赞 ()

相关推荐

内容页底部广告位3
留言与评论(共有 0 条评论)
   
验证码: