PythonのMapReduceをローカルDocker Hadoopクラスタで実行する



導入
このポストは、古典的なワード数例を使用してカスタムのPythonマッパーと減速機能を実行するためにローカルDocker Hadoopクラスタを展開する方法を扱います.

環境設定
Docker、取得Dockerhere
Dockerの作成、Dockerを作成するhere
Git、Gitを得るhere

Dockerを使用したHadoopクラスタの配備
我々は、Dockerイメージを使用しますbig-data-europe repository Hadoopを設定する.
git clone [email protected]:big-data-europe/docker-hadoop.git
ローカルマシン上のHadoop用のDockerイメージを使用すると、Dockerを使用してローカルHadoopクラスタを設定できます.置換するdocker-compose.yml 以下のファイルをthis GitHub Gist .
このDocker構成ファイルは、Hadoopクラスタをマスターノード(namenode)と3つのワーカーノードに設定します.また、ノード間の通信を許可するようにネットワークポートを構成します.クラスタを起動するには、次の手順を実行します.
docker-compose up -d
用途docker ps コンテナが確認されるには、次のようなコンテナリストが表示されます.
IMAGE                           PORTS                    NAMES
docker-hadoop_resourcemanager                            resourcemanager
docker-hadoop_nodemanager1      0.0.0.0:8042->8042/tcp   nodemanager1
docker-hadoop_historyserver     0.0.0.0:8188->8188/tcp   historyserver
docker-hadoop_datanode3         9864/tcp                 datanode3
docker-hadoop_datanode2         9864/tcp                 datanode2
docker-hadoop_datanode1         9864/tcp                 datanode1
docker-hadoop_namenode          0.0.0.0:9870->9870/tcp   namenode
ローカルHadoopクラスタの現在の状態はlocalhost:9870

Python MapReduce関数の実行
この単純なMapReduceプログラムのために、我々は古典的な語カウント例を使用します.プログラムは、テキストファイルを読み取り、各単語が発生する頻度をカウントします.
マッパー関数はテキストを読み込み、キー値のペアを出力します<word, 1> . 次のコードをコピーしますmapper.py
#!/usr/bin/env python
"""mapper.py"""

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print ('%s\t%s' % (word, 1))
還元子関数はマッパーから結果を処理し、単語カウントを返します.次のコードをコピーしますreducer.py
#!/usr/bin/env python
"""reducer.py"""

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print ('%s\t%s' % (current_word, current_count))
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print ('%s\t%s' % (current_word, current_count))
HadoopがJavaで構築されたApacheサーバで動作するため、プログラムはJava jarファイルを入力として受け取ります.HadoopでPythonを実行するには、Hadoop Streaming library Python実行可能ファイルをJavaフレームワークにパイプする.その結果、stdinからPython入力を処理する必要があります.
ローカルコピーmapper.py and reducer.py 名前ノードに
docker cp LOCAL_PATH/mapper.py namenode:mapper.py
docker cp LOCAL_PATH/reducer.py namenode:reducer.py
Hadoopクラスタの名前空間コンテナを入力します.
docker exec -it namenode bash
ランls そして、あなたは見つける必要がありますmapper.py and reducer.py NameNodeコンテナで.
それでは入力を準備しましょう.この簡単な例では、短い文字列でテキストファイルのセットを使用します.より現実的な例についてはProject Gutenberg , ダウンロードPlain Text UTF-8 エンコード.
mkdir input
echo "Hello World" >input/f1.txt
echo "Hello Docker" >input/f2.txt
echo "Hello Hadoop" >input/f3.txt
echo "Hello MapReduce" >input/f4.txt
Hadoop分散ファイルシステム( HDFS )からのMapReduceプログラムアクセスファイル.次のコマンドを実行して、入力ディレクトリとファイルをHDFSに転送します.
hadoop fs -mkdir -p input
hdfs dfs -put ./input/* input
用途find / -name 'hadoop-streaming*.jar' Hadoopの文字列ライブラリJARファイルを探します.パスは、次のようになりますPATH/hadoop-streaming-3.2.1.jar最後に、MapReduceプログラムを実行できます.
hadoop jar /opt/hadoop-3.2.1/share/hadoop/tools/lib/hadoop-streaming-3.2.1.jar \
-file mapper.py    -mapper mapper.py \
-file reducer.py   -reducer reducer.py \
-input input -output output
クラスタをシャットダウンしてコンテナを削除するには、次の手順に従います.
docker-compose down

リファレンス
円DockerでHadoopクラスタを設定する方法.
から取得します.here
PythonでHadoop MapReduceプログラムを書く
から取得します.here