RxJS を使ってあらゆるWeb素材を抽象化し、fp-tsで自在に読み込む

52248 ワード

素材の抽象化

画像、動画、音声、Three.jsの素材、etc...、Webで使う素材は様々。
これら全てに個別の読み込み処理を書くよりも、素材として一つに抽象化できれば扱いやすい。

事前にローディング画面を表示しつつ使う素材を一気に読み込む
使うタイミングで都度読み込む
など読み込みを行うタイミングも様々だが、ここではシンプルに

  • 読み込まれていなかったら読み込みを開始し、読み込みが完了したら素材を返す
  • すでに読み込みが完了していたらその素材を返す

ようにできれば使い勝手が良さそうだ。

TypeScript と RxJS で抽象クラスを定義する

Promise を使ってもよいが、RxJSを使う理由としてはオペレータが充実していて、後述するが実際に使う際に直列、並列、もしくは最初に一つだけ読み込んでその後複数読み込む、などなど様々な処理がしやすいため。

構成要素はシンプルで

  • 素材のurl src: string
  • 素材そのもの asset$: Observable<T>

の2つとする。
これを TypeScript を用いて抽象クラスを書くと下記のようになる。

import { Observable } from 'rxjs';
import { shareReplay } from 'rxjs/operators'

export abstract class ObservableAsset<T> {
  readonly src: string
  private cache$: Observable<T> | undefined = undefined

  constructor(src: string) {
    this.src = src
  }

  get asset$() {
    if (!this.cache$) {
      this.cache$ = this.load().pipe(shareReplay(1))
    }
    return this.cache$
  }

  clear = () => {
    this.unload?.()
    this.cache$ = undefined
  }

  protected abstract load: () => Observable<T>
  protected abstract unload: (() => void) | undefined
}

素材を T として Generics で書いている。
実際に使う素材は asset$Observable<T>で、まだ読み込まれていなかったら load() メソッドで読み込みを開始して shareReplay(1) でキャッシュ化、それを cache$ に格納し、すでに読み込みが完了していたらその素材(cache$)を返すようにしている。

clear()unload() を用意しているのは、使わなくなった場合にメモリを解放するため。解放処理自体は unload() に書く。

以下はこの ObservableAsset をもとに、素材別に実体のクラスを書いていく。

画像 ObservableImage

import { Observable } from 'rxjs'

import { ObservableAsset } from './ObservableAsset'

export type ObservableImageProps =
  | {
      type: 'loaded'
      value: HTMLImageElement
    }
  | {
      type: 'loading'
      value: {
        loaded: number
        total: number
      }
    }

export class ObservableImage extends ObservableAsset<ObservableImageProps> {
  protected load = () => {
    const observable = new Observable<ObservableImageProps>((subscriber) => {
      const image = new Image()
      image.onload = () => {
        subscriber.next({ type: 'loaded', value: image })
        subscriber.complete()
      }
      image.onprogress = (progress) => {
        if (progress.lengthComputable && progress.total > 0) {
          const result = {
            type: 'loading' as const,
            value: {
              loaded: progress.loaded,
              total: progress.total,
            },
          }
          subscriber.next(result)
        }
      }

      image.onerror = (error) => {
        subscriber.error(error)
      }
      image.src = this.src
    })

    return observable
  }

  protected unload = undefined
}

素材を T として表すと書いたが、実際には素材の状態を書いている。
ここでは ObservableImagePropsT にあたる。
状態は読込済みと読込中の2種類があり、それぞれ loaded, loading で表していて、value で読込済みの場合 HTMLImageElement、読込中の場合 totalloaded で進捗状況を取得できるようにしている。

unloadclearcache$ の参照が無くなると自動で削除されるため[1]実装していない。

この ObservableImage を実際に使うには下記の様に書く。

const image = new ObservableImage('画像のパス')
const subscription = image.asset$.subscribe((asset) => {
  if (asset.type === 'loaded') {
    // 読み込み完了処理
    // asset.value に HTMLImageElement が入っている
  } else if (asset.type === 'loading') {
    // 読込中の処理
    // total と loaded で読み込みの進捗を計算できる
    const { total, loaded } = asset.value
    const progress = loaded / total
  }
})

// 使わなくなったら unsubscribe
subscription.unsubscribe()

動画 ObservableVideo

import { Observable } from 'rxjs'
import axios from 'axios'

import * as O from 'fp-ts/lib/Option'
import { ObservableAsset } from './ObservableAsset'

export type ObservableVideoProps =
  | {
      type: 'loaded'
      value: string
    }
  | {
      type: 'loading'
      value: {
        loaded: number
        total: number
      }
    }

export class ObservableVideo extends ObservableAsset<ObservableVideoProps> {
  private videoSrc: O.Option<string> = O.none
  protected load = () => {
    const observable = new Observable<ObservableVideoProps>((subscriber) => {
      const cancelToken = axios.CancelToken.source()
      axios({
        url: this.src,
        method: 'GET',
        timeout: 10 * 1000, // ms
        responseType: 'blob',
        onDownloadProgress: (progress: ProgressEvent<EventTarget>) => {
          if (progress.lengthComputable && progress.total > 0) {
            const result = {
              type: 'loading' as const,
              value: {
                loaded: progress.loaded,
                total: progress.total,
              },
            }
            subscriber.next(result)
          }
        },
        cancelToken: cancelToken.token,
      })
        .then((response) => {
          const data = new Blob([response.data], { type: 'video/mp4' })
          const url = URL.createObjectURL(data)
          this.videoSrc = O.some(url)

          subscriber.next({ type: 'loaded', value: url })
          subscriber.complete()
        })
        .catch((error) => {
          subscriber.error(error)
        })

      return () => {
        cancelToken.cancel()
      }
    })

    return observable
  }

  protected unload = () => {
    if (O.isSome(this.videoSrc)) {
      URL.revokeObjectURL(this.videoSrc.value)
    }
    this.videoSrc = O.none
  }
}

動画の場合は若干特殊でバイナリーデータとして読み込む。Blobを作成し、URL.createObjectURL() を呼び出して、BlobURL に変換している。
この場合は unload 処理が必要なので URL.revokeObjectURL で解放している。

videoSrc に関数型言語ライブラリ fp-tsOption を使っていて、ここでは詳細な説明は省くが string | undefinde のようなものだと考えてよい。undefinde の場合は Option.nonestringの場合は Option.some、で valuestring が入っているイメージだ。

またここでは axios を使っているが、ky などを使っても同様のものが書けるはず。

Three.js のテキスチャ ObservableThreeTexture

最後に Three.js のテキスチャのクラスを紹介する。
やっていることは Three.js が用意している TextureLoader を使っているだけで前述の例とほぼ同様だ。

import { Observable } from 'rxjs'
import { Texture, TextureLoader } from 'three'

import { ObservableAsset } from '.ObservableAsset'

export type ThreeTextureProps = {
  width: number
  height: number
  texture: Texture
}

export type ObservableThreeTextureProps =
  | {
      type: 'loaded'
      value: ThreeTextureProps
    }
  | {
      type: 'loading'
      value: {
        loaded: number
        total: number
      }
    }

export class ObservableThreeTexture extends ObservableAsset<ObservableThreeTextureProps> {
  protected load = () => {
    const observable = new Observable<ObservableThreeTextureProps>((subscriber) => {
      const loader = new TextureLoader()
      loader.load(
        this.src,
        (data) => {
          const image = data.image as unknown as { width: number; height: number }
          const result = {
            type: 'loaded' as const,
            value: {
              width: image.width,
              height: image.height,
              texture: data,
            },
          }
          subscriber.next(result)
          subscriber.complete()
        },
        (progress) => {
          if (progress.lengthComputable && progress.total > 0) {
            const result = {
              type: 'loading' as const,
              value: {
                loaded: progress.loaded,
                total: progress.total,
              },
            }
            subscriber.next(result)
          }
        },
        (error) => {
          subscriber.error(error)
        },
      )
    })
    return observable
  }

  protected unload = undefined
}

読み込み処理応用

以下は大量の素材を、32素材ずつに分けて読み込む処理。
fp-ts を使って素材の読み込みを32のチャンク(chunksof)に分けて並列(merge)に読み込み、それらを直列(concat)に繋いで読み込んでいる。
このような処理を書くときに fp-tspipe を使って関数合成ができ、コードも読みやすくなるため大変便利。

import { Observable, concat, merge, map } from 'rxjs'
import * as A from 'fp-ts/lib/Array'
import { pipe } from 'fp-ts/function'

const loadAssets$ = <T, U extends ObservableAsset<T>>(assets: Array<U>, numLoad: number) => {
  // 複数のチャンクに分けて並列読み込み
  const loadObservables = pipe(
    assets,
    A.map((asset) => asset.asset$),
    A.chunksOf(numLoad), // 同時に読み込む数
    A.map((o) => merge(...o)),
  )
  const loader$ = concat(...loadObservables).pipe(
    map((asset, index) => ({ asset: asset, count: index + 1, total: assets.length })),
  )
  return loader$
}

const assets = [
  new ObservableImage('パス'),
  new ObservableVideo('パス'),
  new ObservableThreeTexture('パス'),
  // ...以下いくつでも
]
const numLoad = 32 // 同時に読み込む数
const subscription = loadAssets$(assets, numLoad).subscribe({
  next: ({ count, total }) => {
    // 進捗(いくつ読み込まれたか)
    const progress = count / total
    // 進捗処理を書く
  },
  complete: () => {
    // 全て読み込み完了
  },
})

上のコードを少し変えて、最初の1素材だけはすぐに読み込んで、残りはチャンクに分けて並列読み込む場合を考える。その場合は、畳み込み関数の foldLeft を使って配列の最初の要素と残りに分けてやればよい。

const loadAssets2$ = <T, U extends ObservableAsset<T>>(assets: Array<U>, numLoad: number) => {
  const loadObservables = pipe(
    assets,
    A.map((asset) => asset.asset$),
    A.foldLeft(
      () => [[]],
      (head, tail) => [[head], ...A.chunksOf(numLoad)(tail)],
    ),
    A.map((o) => merge(...o)),
  )
  const loader$ = concat(...loadObservables).pipe(
    map((asset, index) => ({ asset: asset, count: index + 1, total: assets.length })),
  )
  return loader$
}

最初に読み込んだ素材を取り急ぎ表示したい場合は、assets[0] にあたる素材を別の場所で subscribe しておけば、読み込み処理と表示処理を別にし、宣言的に書ける。


脚注
  1. 実際にはWebブラウザの任意のタイミング ↩︎