RxJS 6.xのpipeを使って処理を書く + Subjectを使ってみる
Qiitaで見ていたRxJSのサンプルがRxJS 6.x以降だと書き方が変わるみたいです。 operatorはobservable1.pipe(map(…),reduce(…)).subscribe(…);の […]
目次
Qiitaで見ていたRxJSのサンプルがRxJS 6.x以降だと書き方が変わるみたいです。
operatorは
observable1.pipe(map(...),reduce(...)).subscribe(...);
のように書くimport文を
import { Observable, of } from 'rxjs';
のように書くoperatorのimportは
import { map, reduce } from 'rxjs/operators';
のように書く
ということでサンプルで紹介されていたコードをRxJS 6.x以降の構文に書き換えてみます。
元のコード
coimport { interval } from 'rxjs'
interval(1000)
.filter(x => x%2 === 0)
.scan((acc,curr) => acc+curr,0)
.map(x => String(x)+"!!")
.subscribe(x => console.log(x));
このままコンパイル書けるとめっちゃエラーが出ます。
TSError: ⨯ Unable to compile TypeScript:
libs/index.ts:5:4 - error TS2339: Property 'filter' does not exist on type 'Observable<number>'.
5 .filter(x => x%2 === 0)
~~~~~~
libs/index.ts:5:11 - error TS7006: Parameter 'x' implicitly has an 'any' type.
5 .filter(x => x%2 === 0)
~
libs/index.ts:6:10 - error TS7006: Parameter 'acc' implicitly has an 'any' type.
6 .scan((acc,curr) => acc+curr,0)
~~~
libs/index.ts:6:14 - error TS7006: Parameter 'curr' implicitly has an 'any' type.
6 .scan((acc,curr) => acc+curr,0)
~~~~
libs/index.ts:7:8 - error TS7006: Parameter 'x' implicitly has an 'any' type.
7 .map(x => String(x)+"!!")
~
libs/index.ts:8:14 - error TS7006: Parameter 'x' implicitly has an 'any' type.
8 .subscribe(x => console.log(x));
~
RxJS 6.xの構文で書き直す
メソッドチェーンにしていたものをpipe
の中にかけば良い様子です。
import { interval } from 'rxjs'
import { map, scan, filter } from 'rxjs/operators'
interval(1000)
.pipe(
filter(x => x%2 === 0),
scan((acc, curr) => acc + curr, 0),
map(x => `${x}!!`)
)
.subscribe(x => console.log(x))
これで動きました。
$ yarn test
yarn run v1.16.0
$ ./node_modules/.bin/ts-node libs/index.ts
0!!
2!!
6!!
12!!
^C
せっかくなので、もうちょっと遊んでみる
このコードを使ってもうちょっといろいろ書いてみましょう。
時間指定で停止する
Qiitaのサンプルにも記載がありますが、これもやってみましょう。
Observableをsubscribeした時の戻り値を利用してunsubscribeすれば停止できます。
const subscription = interval(1000)
.pipe(
filter(x => x%2 === 0),
scan((acc, curr) => acc + curr, 0),
map(x => `${x}!!`)
)
.subscribe(x => console.log(x))
setTimeout(() => {
subscription.unsubscribe()
console.log('end')
}, 10000)
これで10秒後停止し、メッセージを出して終了できました。
$ ./node_modules/.bin/ts-node libs/index.ts
0!!
2!!
6!!
12!!
20!!
end
✨ Done in 11.92s.
Subjectを使って並列に処理を実行する
先程の1秒毎に1増えるObserverにSubjectを渡して並列で複数の作業を実行してみます。
import { Subject, interval } from 'rxjs'
import { scan, filter } from 'rxjs/operators'
const subject = new Subject<number>()
// 偶数だけ出力する
subject.pipe(
filter((x:number) => x%2 === 0)
).subscribe(x => console.log(`偶数: ${x}`))
// 奇数だけ出力する
subject.pipe(
filter((x:number) => x%2 !== 0)
).subscribe(x => console.log(`奇数: ${x}`))
// ひたすら足していくだけ
subject
.pipe(
scan((acc, curr) => acc + curr, 0)
).subscribe(x => console.log(`合算: ${x}`))
// 区切り
subject.subscribe(() => console.log(`\n`))
const subscription = interval(1000).subscribe(subject)
// 10秒経過で終了
setTimeout(() => {
subscription.unsubscribe()
console.log('end')
}, 10000)
実行すると、以下のような結果が出てきます。
$ ./node_modules/.bin/ts-node libs/index.ts
偶数: 0
合算: 0
奇数: 1
合算: 1
偶数: 2
合算: 3
奇数: 3
合算: 6
偶数: 4
合算: 10
奇数: 5
合算: 15
偶数: 6
合算: 21
奇数: 7
合算: 28
偶数: 8
合算: 36
end
✨ Done in 12.16s.
interval
で生成された数字を、各subjectが処理して出力していくイメージですね。
APIやユーザー入力の値を並列で処理したい時などに使うとよいのかな。