python hadoop 2.0 mapreduceプログラムの実行
17570 ワード
:#!/usr/bin/python , py 。
1) ( ) ip , ip
#################### ############################
cat /home/hadoop/Sep-2013/*/* | python ipmappper.py | sort | python ipreducer.py
:
99.67.46.254 13
99.95.174.29 47
sum of single ip 13349
#####################hadoop ############################
bin/hadoop jar ./share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -mapper /data/hadoop/jobs_python/job_logstat/ipmapper.py -reducer /data/hadoop/jobs_python/job_logstat/ipreducer.py -input /log_original/* -output /log_ipnum -file /data/hadoop/jobs_python/job_logstat/ipmapper.py -file /data/hadoop/jobs_python/job_logstat/ipreducer.py
:
99.67.46.254 13
99.95.174.29 47
sum of single ip 13349
ipmapper.py:
##########################mapper #######################################
#!/usr/bin/python
# --*-- coding:utf-8 --*--
import re
import sys
pat = re.compile('(?P<ip>\d+.\d+.\d+.\d+).*?"\w+ (?P<subdir>.*?) ')
for line in sys.stdin:
match = pat.search(line)
if match:
print '%s\t%s' % (match.group('ip'), 1)
ipreducer.py
##########################reducer #####################################
#!/usr/bin/python
from operator import itemgetter
import sys
dict_ip_count = {}
for line in sys.stdin:
line = line.strip()
ip, num = line.split('\t')
try:
num = int(num)
dict_ip_count[ip] = dict_ip_count.get(ip, 0) + num
except ValueError:
pass
sorted_dict_ip_count = sorted(dict_ip_count.items(), key=itemgetter(0))
for ip, count in sorted_dict_ip_count:
print '%s\t%s' % (ip, count)
2) ( )
######################## ######################################
cat /home/hadoop/Sep-2013/*/* | python subdirmapper.py | sort | python subdirreducer.py
:
http://dongxicheng.org/recommend/ 2
http://dongxicheng.org/search-engine/scribe-intro/trackback/ 1
http://dongxicheng.org/structure/permutation-combination/ 1
http://dongxicheng.org/structure/sort/trackback/ 1
http://dongxicheng.org/wp-comments-post.php 5
http://dongxicheng.org/wp-login.php/ 3535
http://hadoop123.org/administrator/index.php 4
#######################hadoop ########################################
bin/hadoop jar ./share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -mapper /data/hadoop/jobs_python/job_logstat/subdirmapper.py -reducer /data/hadoop/jobs_python/job_logstat/subdirreducer.py -input /log_original/* -output /log_subdirnum -file /data/hadoop/jobs_python/job_logstat/subdirmapper.py -file /data/hadoop/jobs_python/job_logstat/subdirreducer.py
:
http://dongxicheng.org/search-engine/scribe-intro/trackback/ 1
http://dongxicheng.org/structure/permutation-combination/ 1
http://dongxicheng.org/structure/sort/trackback/ 1
http://dongxicheng.org/wp-comments-post.php 5
http://dongxicheng.org/wp-login.php/ 3535
http://hadoop123.org/administrator/index.php 4
#######################################mapper ###########################################
#!/usr/bin/python
# --*-- coding:utf-8 --*--
import re
import sys
pat = re.compile('(?P<ip>\d+.\d+.\d+.\d+).*?"\w+ (?P<subdir>.*?) ')
for line in sys.stdin:
match = pat.search(line)
if match:
print '%s\t%s' % (match.group('subdir'), 1)
#######################################reducer ###########################################
#!/usr/bin/python
from operator import itemgetter
import sys
dict_subdir_count = {}
for line in sys.stdin:
line = line.strip()
subdir, num = line.split('\t')
try:
num = int(num)
dict_subdir_count[subdir] = dict_subdir_count.get(subdir, 0) + num
except ValueError:
pass
sorted_dict_ip_count = sorted(dict_subdir_count.items(), key=itemgetter(0))
for subdir, count in sorted_dict_ip_count:
print '%s\t%s' % (subdir, count)
【 java mr 】
:
http://asfr.blogbus.com/logs/44208067.html
bin/hadoop jar ./share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -mapper /data/hadoop/mapper.py -reducer /data/hadoop/reducer.py -input /in/* -output /py_out -file /data/hadoop/mapper.py -file /data/hadoop/reducer.py
python mapreduce :
》 linux
》
》 。
:
cat 1.txt | grep 'dong' | sort
cat 1.txt | python grep.py | java sort.jar
:
c++: cin
c: scanf
:
c++:count
c:printf
: Mapper Reducer, java 。
hadoop-streaming 。
,
g++ -o mapper mapper.cpp
g++ -o reducer reduer.cpp
:
cat test.txt | ./mappper | sort | ./reducer
#!/usr/bin/python
# coding=utf-8
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
print '%s\t%s' % (word, 1)
#!/usr/bin/python
# coding=utf-8
from operator import itemgetter
import sys
# maps words to their counts
word2count = {}
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
word2count[word] = word2count.get(word, 0) + count
except ValueError:
# count was not a number, so silently
# ignore/discard this line
pass
# sort the words lexigraphically;
#
# this step is NOT required, we just do it so that our
# final output will look more like the official Hadoop
# word count examples
sorted_word2count = sorted(word2count.items(), key=itemgetter(0))
# write the results to STDOUT (standard output)
for word, count in sorted_word2count:
print '%s\t%s'% (word, count)
packageJobJar: [/data/hadoop/mapper.py, /data/hadoop/reducer.py, /data/hadoop/hadoop_tmp/hadoop-unjar4601454529868960285/] [] /tmp/streamjob2970217681900457939.jar tmpDir=null
14/03/21 16:23:09 INFO client.RMProxy: Connecting to ResourceManager at /192.168.2.200:8032
14/03/21 16:23:09 INFO client.RMProxy: Connecting to ResourceManager at /192.168.2.200:8032
14/03/21 16:23:10 INFO mapred.FileInputFormat: Total input paths to process : 2
14/03/21 16:23:10 INFO mapreduce.JobSubmitter: number of splits:2
14/03/21 16:23:10 INFO Configuration.deprecation: user.name is deprecated. Instead, use mapreduce.job.user.name
14/03/21 16:23:10 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
14/03/21 16:23:10 INFO Configuration.deprecation: mapred.cache.files.filesizes is deprecated. Instead, use mapreduce.job.cache.files.filesizes
14/03/21 16:23:10 INFO Configuration.deprecation: mapred.cache.files is deprecated. Instead, use mapreduce.job.cache.files
14/03/21 16:23:10 INFO Configuration.deprecation: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class
14/03/21 16:23:10 INFO Configuration.deprecation: mapred.mapoutput.value.class is deprecated. Instead, use mapreduce.map.output.value.class
14/03/21 16:23:10 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
14/03/21 16:23:10 INFO Configuration.deprecation: mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir
14/03/21 16:23:10 INFO Configuration.deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
14/03/21 16:23:10 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
14/03/21 16:23:10 INFO Configuration.deprecation: mapred.cache.files.timestamps is deprecated. Instead, use mapreduce.job.cache.files.timestamps
14/03/21 16:23:10 INFO Configuration.deprecation: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class
14/03/21 16:23:10 INFO Configuration.deprecation: mapred.mapoutput.key.class is deprecated. Instead, use mapreduce.map.output.key.class
14/03/21 16:23:10 INFO Configuration.deprecation: mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir
14/03/21 16:23:10 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1394086709210_0008
14/03/21 16:23:10 INFO impl.YarnClientImpl: Submitted application application_1394086709210_0008 to ResourceManager at /192.168.2.200:8032
14/03/21 16:23:10 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1394086709210_0008/
14/03/21 16:23:10 INFO mapreduce.Job: Running job: job_1394086709210_0008
14/03/21 16:23:14 INFO mapreduce.Job: Job job_1394086709210_0008 running in uber mode : false
14/03/21 16:23:14 INFO mapreduce.Job: map 0% reduce 0%
14/03/21 16:23:19 INFO mapreduce.Job: map 100% reduce 0%
14/03/21 16:23:23 INFO mapreduce.Job: map 100% reduce 100%
14/03/21 16:23:24 INFO mapreduce.Job: Job job_1394086709210_0008 completed successfully
14/03/21 16:23:24 INFO mapreduce.Job: Counters: 43
File System Counters
FILE: Number of bytes read=47
FILE: Number of bytes written=248092
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=197
HDFS: Number of bytes written=25
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=2
Launched reduce tasks=1
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=5259
Total time spent by all reduces in occupied slots (ms)=2298
Map-Reduce Framework
Map input records=2
Map output records=4
Map output bytes=33
Map output materialized bytes=53
Input split bytes=172
Combine input records=0
Combine output records=0
Reduce input groups=3
Reduce shuffle bytes=53
Reduce input records=4
Reduce output records=3
Spilled Records=8
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=71
CPU time spent (ms)=1300
Physical memory (bytes) snapshot=678060032
Virtual memory (bytes) snapshot=2662100992
Total committed heap usage (bytes)=514326528
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=25
File Output Format Counters
Bytes Written=25
14/03/21 16:23:24 INFO streaming.StreamJob: Output directory: /py_out
1,hadoopでjava開発で利用可能:
FileSplit fileSplit = (FileSplit)reporter.getInputSplit();
String fileName = fileSplit.getPath().getName();
に表示されます.
,2,同じpython開発では,次のように使用できる.
を使用して、次の操作を行います.
import os
os.environ["map_input_file"]
ここのmap_input_fileはmapに相当する.input.file