RxJS 6: Observableで非同期の処理を行う


RxJSObservableオブジェクトを軸にしたリアクティブプログラミングのJavaScriptライブラリです。非同期の処理やイベントにもとづくコードが簡単に組み立てられます。本稿では、RxJSのObservableでweb APIのデータを読み込んでみます。なお、RxJSについて、詳しくは「RxJS 6入門」01〜07をお読みください。

01 RxJSライブラリをCDNで読み込む

RxJSを手っ取り早く試すには、CDNから読み込むのが手軽でしょう。HTMLドキュメントの<head>要素に、<script>要素をつぎのように加えます。本稿執筆時の最新リリースバージョンはRxJS 6.2.2です。

<script src="https://unpkg.com/rxjs/bundles/rxjs.umd.min.js"></script>

npmなどを用いたそのほかのインストールの仕方についてはGitHubのRxJS 6「Installation and Usage」または「Installation」をご覧ください。ただし、後者のCDNはバージョン5.0.0のままリンクが切れているようです。

02 簡単な非同期の処理にObservableを使ってみる

まずは、Observableを使わない非同期の処理です。つぎの関数(async())に、setTimeout()メソッドで時間待ちの処理を定めました。関数の引数に渡されたミリ秒ののちに、console.log()メソッドで文字列が出力されます。

function async(arg) {
    console.log('start');
    setTimeout(() =>
        console.log(`${Math.floor(arg / 1000)} second delay is up`)
    , arg);
}
async(1000);

// コンソール出力
start
1 second delay is up

RxJSの処理はObservableオブジェクトをつくることから始まります。そのためのメソッドはObservable.create()です。渡す関数には、データをどう処理して送るのか定めます。データを送るメソッドが、関数の引数(Observerオブジェクト)に対して呼び出すObserver.next()です。

Observableの処理を実行するには、Observable.subscribe()メソッドを呼び出さなければなりません。引数に渡す関数が、Observer.next()により送られてくるデータにどう処理を加えるのか定めます。

const {Observable} = rxjs;
function async(arg) {
    return Observable.create((observer) => {
        console.log('start');
        setTimeout(() => {
            observer.next(arg);
        }, arg);
    });
}
async(1000)
.subscribe(
    (arg) => console.log(`${Math.floor(arg / 1000)} second delay is up`)
);
// コンソール出力
start
1 second delay is up

03 XMLHttpRequestとObservableでweb APIのデータを読み込む

つぎは、webで公開されているAPIをXMLHttpRequestオブジェクトで読み込んでみます。APIとして用いるのは、「Open Notify」の「How Many People Are In Space Right Now」です。NASAのデータにもとづいて、今宇宙でどれだけの人たちが働いているのかわかります。XMLHttpRequestクラスの使い方について、詳しくは「XMLHttpRequest の利用」をお読みください。

XMLHttpRequestオブジェクトでデータを読み込み終えたときの処理は、loadイベントのリスナーとして定めます。XMLHttpRequest.responseプロパティでJSONデータが得られますので、調べたいプロパティ値を取り出せばよいでしょう。

const url = 'http://api.open-notify.org/astros.json';
function getSpacePeople(url) {
    return Rx.Observable.create((observer) => {
        const request = new XMLHttpRequest();
        request.addEventListener('load', (event) =>
            observer.next(request.response)
        );
        request.open('GET', url);
        request.send();
    });
}
getSpacePeople(url)
.subscribe(
    (data) => console.log(JSON.parse(data).people)
);

04 イベント待ちのオブジェクトをObservableにする

fromEvent()関数は、イベントを待つオブジェクトのObservableをつくります。引数は、オブジェクトとイベントのふたつです。前掲コードのXMLHttpRequestオブジェクトがloadイベントを待つ処理はつぎのように書き替えられます。

const {Observable, fromEvent} = rxjs;
const url = 'http://api.open-notify.org/astros.json';
function getSpacePeople(url) {
    return Observable.create((observer) => {
        const request = new XMLHttpRequest();
        /* request.addEventListener('load', (event) =>
            observer.next(request.response)
        ); */
        fromEvent(request, 'load')
        .subscribe((event) => observer.next(request.response));
        request.open('GET', url);
        request.send();
    });
}
getSpacePeople(url)
.subscribe(
    (data) => console.log(JSON.parse(data).people)
);

05 処理の失敗を送る

Observableの処理の失敗は、Observer. error()で送ります。受け取ったエラーは、Observable.subscribe()メソッドの第2引数に渡す関数が扱います。前掲コードにエラーの処理を加えたのがつぎのコードです。実際の動きを確かめてコードが試せるように、以下のサンプル001をjsdo.itに掲げました。

const {Observable, fromEvent} = rxjs;
const url = 'http://api.open-notify.org/astros.json';
function getSpacePeople(url) {
    return Observable.create((observer) => {
        const request = new XMLHttpRequest();
        fromEvent(request, 'load')
        .subscribe((event) => observer.next(request.response));
        fromEvent(request, 'error')
        .subscribe((event) => observer.error(request));
        request.open('GET', url);
        request.send();
    });
}
getSpacePeople(url)
.subscribe(
    (data) => console.log(JSON.parse(data).people),
    (request) => console.error('エラー:', request.status)
);

サンプル001■RxJS + ES6: Getting number of people in space with Promise object


>> jsdo.itへ