RxJS 6.xのpipeを使って処理を書く + Subjectを使ってみる

Qiitaで見ていたRxJSのサンプルがRxJS 6.x以降だと書き方が変わるみたいです。 operatorはobservable1.pipe(map(…),reduce(…)).subscribe(…);の […]

広告ここから
広告ここまで

目次

    Qiitaで見ていたRxJSのサンプルがRxJS 6.x以降だと書き方が変わるみたいです。

    operatorはobservable1.pipe(map(...),reduce(...)).subscribe(...);のように書く

    import文をimport { Observable, of } from 'rxjs';のように書く

    operatorのimportは import { map, reduce } from 'rxjs/operators';のように書く

    https://qiita.com/agajo/items/7942743a0130f7a0f30b#%E3%82%AA%E3%83%9A%E3%83%AC%E3%83%BC%E3%82%BF%E3%81%8C%E4%BE%BF%E5%88%A9

    ということでサンプルで紹介されていたコードをRxJS 6.x以降の構文に書き換えてみます。

    元のコード

    coimport { interval } from 'rxjs'
    interval(1000)
      .filter(x => x%2 === 0)
      .scan((acc,curr) => acc+curr,0)
      .map(x => String(x)+"!!")
      .subscribe(x => console.log(x));

    このままコンパイル書けるとめっちゃエラーが出ます。

    TSError: ⨯ Unable to compile TypeScript:
    libs/index.ts:5:4 - error TS2339: Property 'filter' does not exist on type 'Observable<number>'.
    
    5   .filter(x => x%2 === 0)
         ~~~~~~
    libs/index.ts:5:11 - error TS7006: Parameter 'x' implicitly has an 'any' type.
    
    5   .filter(x => x%2 === 0)
                ~
    libs/index.ts:6:10 - error TS7006: Parameter 'acc' implicitly has an 'any' type.
    
    6   .scan((acc,curr) => acc+curr,0)
               ~~~
    libs/index.ts:6:14 - error TS7006: Parameter 'curr' implicitly has an 'any' type.
    
    6   .scan((acc,curr) => acc+curr,0)
                   ~~~~
    libs/index.ts:7:8 - error TS7006: Parameter 'x' implicitly has an 'any' type.
    
    7   .map(x => String(x)+"!!")
             ~
    libs/index.ts:8:14 - error TS7006: Parameter 'x' implicitly has an 'any' type.
    
    8   .subscribe(x => console.log(x));
                   ~

    RxJS 6.xの構文で書き直す

    メソッドチェーンにしていたものをpipeの中にかけば良い様子です。

    import { interval } from 'rxjs'
    import { map, scan, filter } from 'rxjs/operators'
    
    interval(1000)
      .pipe(
        filter(x => x%2 === 0),
        scan((acc, curr) => acc + curr, 0),
        map(x => `${x}!!`)
      )
      .subscribe(x => console.log(x))

    これで動きました。

    $ yarn test
    yarn run v1.16.0
    $ ./node_modules/.bin/ts-node libs/index.ts
    0!!
    2!!
    6!!
    12!!
    ^C

    せっかくなので、もうちょっと遊んでみる

    このコードを使ってもうちょっといろいろ書いてみましょう。

    時間指定で停止する

    Qiitaのサンプルにも記載がありますが、これもやってみましょう。

    Observableをsubscribeした時の戻り値を利用してunsubscribeすれば停止できます。

    const subscription = interval(1000)
      .pipe(
        filter(x => x%2 === 0),
        scan((acc, curr) => acc + curr, 0),
        map(x => `${x}!!`)
      )
      .subscribe(x => console.log(x))
    
    setTimeout(() => {
      subscription.unsubscribe()
      console.log('end')
    }, 10000)

    これで10秒後停止し、メッセージを出して終了できました。

    $ ./node_modules/.bin/ts-node libs/index.ts
    0!!
    2!!
    6!!
    12!!
    20!!
    end
    ✨  Done in 11.92s.

    Subjectを使って並列に処理を実行する

    先程の1秒毎に1増えるObserverにSubjectを渡して並列で複数の作業を実行してみます。

    import { Subject, interval } from 'rxjs'
    import { scan, filter } from 'rxjs/operators'
    
    const subject = new Subject<number>()
    // 偶数だけ出力する
    subject.pipe(
      filter((x:number) => x%2 === 0)
    ).subscribe(x => console.log(`偶数: ${x}`))
    
    // 奇数だけ出力する
    subject.pipe(
      filter((x:number) => x%2 !== 0)
    ).subscribe(x => console.log(`奇数: ${x}`))
    
    // ひたすら足していくだけ
    subject
    .pipe(
      scan((acc, curr) => acc + curr, 0)
    ).subscribe(x => console.log(`合算: ${x}`))
    
    // 区切り
    subject.subscribe(() => console.log(`\n`))
    
    const subscription = interval(1000).subscribe(subject)
    
    // 10秒経過で終了
    setTimeout(() => {
      subscription.unsubscribe()
      console.log('end')
    }, 10000)

    実行すると、以下のような結果が出てきます。

    $ ./node_modules/.bin/ts-node libs/index.ts
    偶数: 0
    合算: 0
    
    
    奇数: 1
    合算: 1
    
    
    偶数: 2
    合算: 3
    
    
    奇数: 3
    合算: 6
    
    
    偶数: 4
    合算: 10
    
    
    奇数: 5
    合算: 15
    
    
    偶数: 6
    合算: 21
    
    
    奇数: 7
    合算: 28
    
    
    偶数: 8
    合算: 36
    
    
    end
    ✨  Done in 12.16s.

    intervalで生成された数字を、各subjectが処理して出力していくイメージですね。

    APIやユーザー入力の値を並列で処理したい時などに使うとよいのかな。

    広告ここから
    広告ここまで
    Home
    Search
    Bookmark