How it works(8)GDAL 2 Mbtilesソース読解(B)映像切断と処理


vips.py
最も重要なカットモジュールはlibvipsという高速軽量のc++モジュールのpyバインドpyvipsを使用し、これもg 2 mの中で最大のモジュールである.カット図は主に2つの部分に分かれています.
  • タイル仕切り
  • 帯域処理
  • タイル分割
    瓦片分割はg 2 mの核心機能である.基本的なプロセスは次のとおりです.
  • 取得ピクチャ(オリジナルまたは再サンプリング後のピクチャを直接使用)
  • 画像から固定サイズ
  • を切り出す.
  • は、当該部分が属する行列番号を算出して記憶する.

  • 実際の使用では、特定のレベルだけをとることはほとんど不可能であるため、低スケールレベルのダウンサンプリングや地図自体の解像度を超えるアップサンプリングは避けられない.再サンプリングを必要とするスケールレベルでは、g 2 mは、サンプリング前に図面全体をシミュレーション変換してから、裁断後ではなく瓦上で直接裁断する、このような効果はかなり悪いためである.図面全体の流れは以下の通りです.
    #        
    if min_resolution <= self.resolution <= max_resolution:
              self.slice_native(tiles, fill_borders=fill_borders)
    #           
    if 0 <= min_resolution < self.resolution:
        self.slice_downsample(tiles=tiles,
                              min_resolution=min_resolution,
                              max_resolution=max_resolution,
                              fill_borders=fill_borders)
    #           
    if self.resolution < max_resolution:
        self.slice_upsample(tiles=tiles,
                            min_resolution=min_resolution,
                            max_resolution=max_resolution,
                            fill_borders=fill_borders)
    

    リサンプル
    再サンプリングには、画像全体をレベル毎にスケーリングする、すなわち、画像のシミュレーション変換が必要である.ダウンサンプリングを例にとります(アップサンプリングの考え方はまったく同じです):
    def slice_downsample(self, tiles, min_resolution, max_resolution=None,
                             fill_borders=None):
            """
                      A,A       18 ,     12  20    
            """
            #       ,                
            #   ,      17 
            if max_resolution is None or max_resolution >= self.resolution:
                max_resolution = self.resolution - 1
    
            with LibVips.disable_warnings():
                #                     
                #   ,A      1 
                tiles = tiles.downsample(levels=(self.resolution - max_resolution),)
                for res in reversed(list(range(min_resolution, max_resolution + 1))):
            #        A
                    tiles._slice()
            #     ,          ,      
                    if res > min_resolution:
                        tiles = tiles.downsample(levels=1)
                        
        def downsample(self, levels=1):
            """
               ,           
            """
            image = self.image
            offset = self.offset
        #             
            parent = self._parent if self._parent is not None else self
            parent_resolution = parent.resolution
            parent_size = VImageAdapter(parent.image).BufferSize()
            #      level 1,          1 
            for res in reversed(list(range(self.resolution - levels, self.resolution))):
               #      ,     X,Y   (      ,      )
                offset /= 2.0
                #       ,     ,           (      ,          )
                shrunk = VImageAdapter(image).shrink_affine(xscale=0.5, yscale=0.5)
                #                
                image = VImageAdapter(shrunk).tms_align(tile_width=self.tile_width,
                                         tile_height=self.tile_height,
                                         offset=offset)
                offset = offset.floor()
                # VIPS     :       ,            (  IMAGE_BUFFER_INTERVAL 4 )    ,      .
                #   ,       ,          4  ,           .
                if parent_resolution - res >= self.IMAGE_BUFFER_INTERVAL:
                    if parent_size < self.IMAGE_BUFFER_MEMORY_THRESHOLD:
                        #        (  IMAGE_BUFFER_MEMORY_THRESHOLD 1M),       
                        continue
            #         (  1M    ,  1G     )
                    image = self.write_buffer(image=image, resolution=res)
                    parent_resolution = res
                    parent_size = VImageAdapter(image).BufferSize()
        #                   
            if parent_resolution < parent.resolution:
                parent = None
          #      level 1,            .
          #  level  1,                       ,      
                if parent_resolution != res:
                    image = self.write_buffer(image=image, resolution=res)
        #       
            result = self.__class__(image=image,
                                    storage=self.storage,
                                    tile_width=self.tile_width,
                                    tile_height=self.tile_height,
                                    offset=offset,
                                    resolution=res)
            #      
            result._parent = parent
            return result
    

    ここでは、シミュレーション変換とオフセット補正の2つの重要な方法が用いる.
    シミュレーションへんかん
    画像の拡大と縮小はシミュレーション変換に関連しています.
    def shrink_affine(self, xscale, yscale, output_size=None):
        return self._scale(
            xscale=xscale, yscale=yscale, output_size=output_size, interpolate='bilinear'
        )
        
     def _scale(self, xscale, yscale, output_size, interpolate):
            if output_size is None:
                if XY(x=xscale, y=yscale) > XY(x=1.0, y=1.0):
                  #         
                    output_width = int(ceil(self.image.width * xscale))
                    output_height = int(ceil(self.image.height * yscale))
                else:
                  #       
                    output_width = int(floor(self.image.width * xscale))
                    output_height = int(floor(self.image.height * yscale))
            else:
                output_width, output_height = output_size
    
         #        ,  b,c 0.      :
            # [[xscale, 0],
            # [ 0, yscale]]
            a, b, c, d = xscale, 0, 0, yscale
            if interpolate == 'near':
              #           
                offset_x = offset_y = 0
            else:
                #     
                offset_x = (a - 1) / 2
                offset_y = (d - 1) / 2
    
            output_x, output_y = 0, 0
            return self.affine(a=a, b=b, c=c, d=d, dx=offset_x, dy=offset_y,
                               ox=output_x, oy=output_y,
                               ow=output_width, oh=output_height,
                               interpolate=interpolate)
        
    def affine(self, a, b, c, d, dx, dy, ox, oy, ow, oh,
               interpolate='bilinear'):
        if interpolate == 'near':
            interpolate = 'nearest'
    
        if interpolate == 'bilinear' or interpolate == 'nearest':
            interpolate = Interpolate.new(interpolate)
    
        image = self.image.affine(
            [a, b, c, d],
            interpolate=interpolate,
            oarea=[ox, oy, ow, oh],
            odx=dx, ody=dy,
            idx=0, idy=0
        )
        #               ,             c++     
        image.__inputref = self.image
        return image
    

    オフセットの修正
    瓦の画素座標が整数でない場合に発生するようなオフセットは、いつ発生するか分かりません.
       def tms_align(self, tile_width, tile_height, offset):
            #         
            x = int(round(offset.x * tile_width)) % tile_width
            y = int(round(self.image.height - offset.y * tile_height)) % tile_height
            tiles_x = ceil((self.image.width + x / 2) / tile_width)
            tiles_y = ceil((self.image.height + y / 2) / tile_height)
            width = int(tiles_x * tile_width)
            height = int(tiles_y * tile_height)
            if width == self.image.width and height == self.image.height:
                #            
                assert x == y == 0
                return self.image
    
            #                   
            return self.image.embed(
                x, y, width, height, background=[0, 0, 0, 0] # Transparent
            )
    

    画像とストレージの切断
    画像の再サンプリングが完了すると、サンプリング後の画像を切断することができる.もちろん、元のスケーリングレベルでは、再サンプリングを行う必要はなく、直接このステップに進むことができる.
    def _slice(self):
            with LibVips.disable_warnings():
              #            ,                  
                for y in range(0, self.image_height, self.tile_height):
                    for x in range(0, self.image_width, self.tile_width):
                        out = self.image.extract_area(x, y,self.tile_width, self.tile_height)
                        #           
                        offset = XY(
                            x=int(x / self.tile_width + self.offset.x),
                            y=int((self.image_height - y) / self.tile_height +
                                  self.offset.y - 1)
                        )
                        #       (   mbtiles        )
                        self.storage.save(x=offset.x, y=offset.y,
                                          z=self.resolution,
                                          image=out)
    

    コードにずっと現れるオフセット(offset)は、実はこの地図の左下隅座標に対応する行列番号である.右と上を正方向とする座標系である.しかしvips処理画像については左上が原点であり、正下と正方向である.従って、異なるタイルの行列番号を算出する際には、変換が必要となる.
    バンド処理
    帯域処理は必要ないが、g 2 mは依然として機能を提供し、帯域の処理には純粋な数学的方式を採用している.基本的なプロセスは次のとおりです.
  • 画像をNumpy形式
  • に変換
  • NumExpr,演算式を用いて帯域処理
  • を完了する.
  • Numpy形式のデータを画像
  • に戻す
    中でも比較的特別な方法は、g 2 mが式を用いる色変換の計算である.
    def colorize(self, image, nodata=None):
        #       numpy  
        data = numpy.frombuffer(buffer=image.write_to_memory(),
                                dtype=VImageAdapter(image).NumPyType())
    
        #   numexpr        rgba
        bands = self._colorize_bands(data=data, nodata=nodata)
    
        #           vips   
        images = [VImageAdapter.from_numpy_array(
            array=band, width=image.width, height=image.height, bands=1,
            format='uchar'
        ) for band in bands]
        return VImageAdapter.gbandjoin(bands=images)
        
    def _colorize_bands(self, data, nodata=None):
      #   rgba
      for band in 'rgba':
          expr = self._expression(band=band, nodata=nodata)
          if expr is None:
              #                
              array = numpy.empty(shape=data.size, dtype=numpy.uint8)
              array.fill(self._background(band=band))
              yield array
          else:
              #      
              yield numexpr.evaluate(self._expression(band=band,
                                                      nodata=nodata),
                                     local_dict=dict(n=data.copy()),
                                     global_dict={})
        
    def _expression(self, band, nodata=None):
      #             _clauses  
        clauses = self._clauses(band=band, nodata=nodata)
      #        
        result = str(getattr(self.BACKGROUND, band))
        #      
        for expression, true_value in clauses:
            result = 'where({expression}, {true}, {false})'.format(
                expression=expression, true=true_value, false=result
            )
        return result
    

    画像の処理は、ユーザ定義の処理方法に関係しており、ここでは、特定の色変換という処理方法を例に挙げる.
    def _clauses(self, band, nodata=None):
      #    ,        ,             .
      #        
      # {
      #     0:rgba(255,0,0,255)
      # }
      #   r  ,     0,      255
        colors = self._colors(band=band)
        background = self._background(band=band)
      #       [('n == 0',255)]    ,
      # numexpr       numpy     
        return [('n == {0!r}'.format(band_value),  
                 color)                            
                for band_value, color in colors
                if band_value != nodata and color != background]