goroutineでシェルっぽくバッチ処理を書いてみた


小さく分割してきれいにつくりたい、けど効率も重視したい

シェルや関数型プログラミングのように、部品の組み合わせでプログラムを書きたい。しかし、パイプラインや遅延評価みたいな仕組みがないと、ループ回数だけ増えてしまって効率が悪い。仕方がないから、1つの処理の中に全機能まとめてしまおう。そんな経験はないでしょうか。

それgo言語でできる!

go言語のgoroutineとchannelを勉強してみてまさにこれだと思い、go言語の勉強を兼ねて作ってみました。

前提環境

  • goバージョン:1.14

完成イメージ

シェルっぽく使いたいということで、シェルを意識した設計にしています。メソッドは基本的にchannelに関連付けたメソッドとして定義し、戻り値にはchannelを返すようにしました。そうすることで、メソッドチェーンをシェルのパイプに見立てて使えます。ただし、起点となる処理はchannelにすると逆に使いにくいので、普通に値を引数をとるようにします。

slip.Cat("data/slip.csv").Filter().Sort()

また、2つ以上のchannelをインプットにする処理については、メソッドチェーンで受け取れないので、引数でchannelを渡します。これは、逆にシェルにはない利点かと思います。シェルだと合流地点で同期をとる必要がありますが、goなら戻り値のchannelをそのまま渡すだけなので、見た目的にも効率的にも理想的です。

slipitem.Match(
        // tr: cat -> filter -> sort
        slip.Cat("data/slip.csv").Filter().Sort(),
        // ma: cat -> sort
        item.Cat("data/item.csv").Sort()
)

フィルターやソートの条件は都度変えたいので、引数で渡せるようにします。goでは関数が第一級オブジェクトとして扱われるので、関数の引数に関数を指定することができます。

  slip.Cat("data/slip.csv").Filter(func(slip slip.Slip) bool {
            return slip.No >= "20000"
        }).Sort(func(slips slip.Slips, i, j int) bool {
            return slips[i].ItemCode < slips[j].ItemCode
        })
)

このようなイメージでよくあるバッチ処理を書いてみました。main処理の完成形が下記です。ファイルに保存されているTR(伝票)とMA(商品)を商品コードでマッチングするイメージです。goroutineとchannelはmain処理からは隠蔽されており、自然な見た目になっていると思います。

main.go
package main

import (
    "fmt"
    "go-batch-sample/item"
    "go-batch-sample/slip"
    "go-batch-sample/slipitem"
)

func main() {
    // Cat, Filter, Sort, Matching and file out sample
    slipitem.Match(
        // tr: cat -> filter -> sort
        slip.Cat("data/slip.csv").Filter(func(slip slip.Slip) bool {
            return slip.No >= "20000"
        }).Sort(func(slips slip.Slips, i, j int) bool {
            return slips[i].ItemCode < slips[j].ItemCode
        }),
        // ma: cat -> sort
        item.Cat("data/item.csv").Sort(func(items item.Items, i, j int) bool {
            return items[i].Code < items[j].Code
        })).Out("data/slipItem.csv")
}

各メソッドの実現

メイン処理をメソッドチェーンでコーディングしていけるようにするには、go func()をメイン処理で使わないようにします。そのためには、go func()をラップするメソッドが必要です。メソッドの基本構造は、ラッパーメソッドと内部の即時関数という構造にしました。基本的な処理の流れは、戻り値用のchannelを作り、即時関数をgoroutineで非同期実行して(その中でchannelに送信していき)、戻り値用のchannelをreturnする、という流れです。また、前メソッドの戻り値のchannelをメソッドチェーンで受け取るように、channelに対してメソッドを定義します。goでは構造体にメソッドを定義することはできるのですが、channelに対しては、メソッドを定義することができません。そこで、単にchannelをラップするため型のエイリアスを設けます。

type Slip struct {
    No       string
    ItemCode string
    Count    int
}

type Slipchan chan Slip

ファイル読み込み(Cat)

ファイルの読み込みは、起点になる処理なので、Inputとしてchannelは受け取らず、ファイル名を引数にとり、戻り値にchannelを返却します。即時関数をgo func()してgoroutineで実行し、処理完了を待たずにchannelを返します。即時関数の中では、ファイルを1行1行読み込み、そのたびに、channelに送信していきます。ファイルの中身をすべて読み込み終わったら、処理を終了します。関数の最初で、defer close(ch)しているので、関数の終了に伴ってchannelがクローズされ、次のメソッドにクローズが通知されます。

func Cat(filename string) Slipchan {
    ch := make(Slipchan)
    go func(filename string, ch Slipchan) {
        defer close(ch)

        // file open
        file, err := os.Open(filename)
        if err != nil {
            panic(err)
        }
        defer file.Close()

        // read while file end
        reader := csv.NewReader(file)
        //  reader.Comma = '\t'

        for {
            line, err := reader.Read()

            switch err {
            case nil:
                // format to slip
                slip := Slip{}
                slip.No = line[0]
                slip.ItemCode = line[1]
                slip.Count, _ = strconv.Atoi(line[2])

                // send to channel
                ch <- slip

            case io.EOF:
                return

            default:
                panic(err)
            }
        }
    }(filename, ch)
    return ch
}

フィルター(Filter)

フィルターはInputとして前のメソッドからchannelを受けとります。channelの受け取りはメソッドチェーンとして行うので、channelの別名Slipchan型に対するメソッドとして定義します。また、フィルタリング条件は、検証関数を引数として受け取る形とします。戻り値の考え方は同じです。

func (ich Slipchan) Filter(test func(slip Slip) bool) Slipchan {
    och := make(Slipchan)
    go func(ich, och chan Slip) {
        defer close(och)
        for slip := range ich {
            if test(slip) {
                och <- slip
            }
        }
    }(ich, och)
    return och
}

ソート(Sort)

ソートもフィルターと同様、channelをメソッドチェーンで受け取り、channelを戻り値で返します。ただ、ソート条件の関数を渡すのに苦戦しました。sort.SliceStableを利用すると、条件を実行時に渡す安定ソートが行えるのですが、ラッパーメソッドの引数に渡した関数をそのままsort.SliceStableの引数に渡そうとするとうまくいきませんでした。SliceStableでは、第1引数のソート対象のsliceを第2引数の比較関数の中で利用する形で関数を定義します。クロージャーのようなイメージです。そのため、ラッパーメソッドの引数をそのまま利用するのは難しく、苦肉の策として、内部の即時関数の中で呼び出すsort.SliceStableに渡すの比較関数の中の比較処理として呼び出してもらうような関数を渡すことにしました。(ソースを見たほうが早そうですね。)

type Slips []Slip

func (ich Slipchan) Sort(sortfn func(slips Slips, i, j int) bool) Slipchan {
    och := make(Slipchan)
    go func(ich, och chan Slip) {
        defer close(och)
        tmpSlips := Slips{}
        for slip := range ich {
            tmpSlips = append(tmpSlips, slip)
        }

        sort.SliceStable(tmpSlips, func(i, j int) bool {
            return sortfn(tmpSlips, i, j)
        })

        for _, slip := range tmpSlips {
            och <- slip
        }
    }(ich, och)
    return och
}

mainから利用するときはこうなります。

main.go
.Sort(func(slips slip.Slips, i, j int) bool {
            return slips[i].ItemCode < slips[j].ItemCode
        })

現状のソートは、一度メモリにすべてため込んでからソートをかけています。もう少し特性にあったソートに変えたいところです。

マッチング処理(Match)

マッチングは2つのchannelが必要なため、メソッドチェーンではなく、引数でchannelを渡します。channelは最初からすべてのデータがわかるわけではないので、Mapのようにkey指定で取得することはできません。また、channelは1週しかできないので、単純に二重ループしてマッチングすると、エラーになってしまします。ma/trをひとつづつchannelから受信していく必要があります。全trの終了の判定は、channelから受信できなくなった(closeされた)のを、チェックするようにしています。

func Match(trch slip.Slipchan, mach item.Itemchan) SlipItemchan {
    outch := make(SlipItemchan)

    go func(trch slip.Slipchan, mach item.Itemchan, outch SlipItemchan) {
        defer close(outch)

        tr := <-trch
        ma := <-mach

        for {
            switch {
            case tr.ItemCode == ma.Code:
                // match
                slipItem := SlipItem{}
                slipItem.Slip = tr
                slipItem.ItemName = ma.Name
                outch <- slipItem

                trtmp, ok := <-trch
                if ok {
                    // if tr exists, update tr
                    tr = trtmp
                } else {
                    // if tr ends, fisnish
                    return
                }

            default:
                matmp, ok := <-mach
                if ok {
                    // if master exists, update ma
                    ma = matmp
                } else {
                    // if master ends, error
                    panic("ma ends")
                }
            }
        }
    }(trch, mach, outch)

    return outch
}

ファイル出力(Out)

ファイル出力は、channelをメソッドチェーンで受け取って、channelから逐次ファイル書き込みしていきます。ファイル出力しきって処理完了としたいため、goroutine化せずに、同期処理として書いています。teeみたいにしたい場合は、その他のメソッドと同じように内部の即時関数を使ってgoroutineにしてもよいかもしれません。

func (ch SlipItemchan) Out(filename string) {
    file, err := os.Create(filename)
    if err != nil {
        panic(err)
    }
    defer file.Close()
    for slipItem := range ch {
        fmt.Fprintln(file, slipItem.No, ",", slipItem.ItemCode, ",", slipItem.ItemName, ",", slipItem.Count)
    }
}

go言語はイケてる

goroutineを隠蔽化してmainメソッドはかなりすっきり表現できたと思います。各データ毎に基本セットを用意しておけば、生産性高くメイン処理を書いていけるのではないでしょうか。それぞれのメソッドの実装も特に複雑なことはなく、単純なパターンに落とし込めているかと思います。ただ、いくらgoroutineがプロセスやスレッドに比べて軽量といっても、こんな感じでポコポコ起動してよいものなのか勘所がないので、その検証は必要そうです。また、効率を考えてはいるものの、本当にシェルのパイプラインのように高速に動いてくれるのかは検証できていません。時間があれば試してみたいです。