Goを使用した簡単なデータパイプラインの作成


こんにちは!
私はカルビ餃子が好きな開発者の任太彬です.

Goは、チャネルを使用して簡単なパイプラインを作成しようとします.

データを作成し、チャネルを介して転送、出力、和を求めるコード.

コードが実装される前に、簡単に図を描きます.

コードの実装を始めましょう.
まず完全なコードを共有します.
package main

import "log"

func GenerateData() <-chan int {
	oc := make(chan int)
	go func() {
		i := 0
		for {
			oc <- i
			i = i + 1
			if i == 15 {
				break
			}
		}
		close(oc)
	}()
	return oc
}

func PrintData(ic <-chan int) <-chan int {
	oc := make(chan int)
	go func() {
		for data := range ic {
			log.Println(data)
			oc <- data
		}
		close(oc)
	}()
	return oc
}

func TotalSumOfData(ic <-chan int) {
	total := 0
	for data := range ic {
		total += data
	}
	log.Println("total: ", total)
}

func main() {
	TotalSumOfData(PrintData(GenerateData()))
}
まず最初に表示するコードはGenerateDataです.この関数は、データを生成します.
func GenerateData() <-chan int {
	oc := make(chan int)
	go func() {
		i := 0
		for {
			oc <- i
			i = i + 1
			if i == 15 {
				break
			}
		}
		close(oc)
	}()
	return oc
}
ここでsyncですWaitGroupは使用されていません.
これは、main関数セクションで「TotalSumOfData」(PrintData)が関数によって異なるためです.
まず、チャネルを使用するには、チャネルに値を入力し、すぐに消費する必要があります.GenerateDataにはこのような内容はありませんが、PrintDataはRangeによって実現されています.だからsyncを選びましたWaitGroupは使用されていません.

2番目のコードはPrintDataです.この関数は、データが正しく入力されているかどうかを確認するために使用します.
func PrintData(ic <-chan int) <-chan int {
	oc := make(chan int)
	go func() {
		for data := range ic {
			log.Println(data)
			oc <- data
		}
		close(oc)
	}()
	return oc
}
このセクションでは、上記の範囲がGenerateDataで生成されるチャネル消費値になります.その後、TotalSumOfDataの新しいチャネルを再作成します.

TotalSumOfDataは、チャネル内のすべての値を追加することによって最終値を作成する最後に表示するコードです.
func TotalSumOfData(ic <-chan int) {
	total := 0
	for data := range ic {
		total += data
	}
	log.Println("total: ", total)
}
リアルタイムデータを処理しながらパイプラインを作成する簡単なコードを作成できると思います.次回は、Fluentdで入力したログデータを処理して作成したいと思います.

この投稿を読んでくれてありがとう.
https://github.com/tae2089/golang/blob/main/datapipeline/main.go