MapReduce WordCount 入门 Python(一)

前言

根据已经搭建好的Hadoop2.7.2集群,就可以在集群上跑任务了,一个简单的WordCount开始。使用Python脚本语言,它可以更快捷更方便咱们开发调试。

MapReduce简介

MapReduce的Data flow如上图,原始数据经过mapper处理,再进行partition和sort,到达reducer,输出最后结果。

Hadoop Streaming简介

Hadoop本身是用Java开发的,程序也需要用Java编写,但是通过Hadoop Streaming,我们可以使用任意语言来编写程序,让Hadoop运行。

Hadoop Streaming的相关源代码可以在Hadoop的Github repo 查看。简单来说,就是通过将用其他语言编写的mapper和reducer通过参数传给一个事先写好的Java程序(Hadoop自带的*-streaming.jar),这个Java程序会负责创建MR作业,另开一个进程来运行mapper,将得到的输入通过stdin传给它,再将mapper处理后输出到stdout的数据交给Hadoop,partition和sort之后,再另开进程运行reducer,同样地通过stdin/stdout得到最终结果。因此,我们只需要在其他语言编写的程序里,通过stdin接收数据,再将处理过的数据输出到stdout,Hadoop streaming就能通过这个Java的wrapper帮我们解决中间繁琐的步骤,运行分布式程序。

当中的STDIN到STDOUT这一块就可以理解为是在Hadoop Streaming当中运行,由Hadoop Streaming提供标准输入与输出,从而达到支持Java以外的语言执行MapReduce

mark

Map代码

首先通过标准输入的方式接收数据,然后循环每一行数据进行过滤拆分,最终输出形式,即word,1每遇到一个单词输出。

1
2
3
4
5
6
7
#!/usr/bin/env python
# coding=utf-8
import sys
for line in sys.stdin:
ss = line.strip().split(' ')
for word in ss:
print "%s\t%d" % (word,1)

Reduce代码

Reduce一样通过标准输入接收数据,然后统计每一个单词出现的次数,最终再输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#!/usr/bin/env python
# coding=utf-8
import sys
cur_word=None
cur_sum=0
for line in sys.stdin:
ss=line.strip().split('\t')
if len(ss)!=2:
continue
word,value=ss
if cur_word == None:
cur_word=word
if cur_word == word:
cur_sum+=(int(value))
else:
print "%s\t%d" % (cur_word,cur_sum)
cur_word=word
cur_sum=(int(value))
print "%s\t%d" % (cur_word,cur_sum)

本地调试(服务器)

通过head 一个文件的方式(或者使用cat都可以)进行一个标准输出,把它作为map.py的标准输入,然后排序第一列(即word),然后把map.py的输出作为reduce.py的标准输入,进行统计

1
head 1 the_man_of_property.txt | python map.py | sort -k1 | python reduce.py | sort -k2 -r | head -10

​ 以上方式相当于就是在本地执行模拟map,reduce调试,确保程序正确后然后放置到集群当中去跑

集群运行MapReduce Join

以下文件当中的Input,Output都是HDFS当中的目录文件,执行前需要先把需要统计的文件put到hdfs系统。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
HADOOP_CMD="/home/lishijia/Soft/hadoop-2.7.2/bin/hadoop"
STREAM_JAR_PATH="/home/lishijia/Soft/hadoop-2.7.2/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar"

#INPUT_FILE_PATH_1 = "/"
INPUT_FILE_PATH_1="/lishijia/input/the_man_of_property.txt"
OUTPUT_PATH="/lishijia/output/mapreduce/"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

# Step 1.

$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_1 \
-output $OUTPUT_PATH \
-mapper "python map.py" \
-reducer "python reduce.py" \
-file ./map.py \
-file ./reduce.py

执行日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
[root@master201 mapreduce]# ./run.sh 
rmr: DEPRECATED: Please use 'rm -r' instead.
Deleted /lishijia/output/mapreduce
18/10/07 22:17:33 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [./map.py, ./reduce.py, /tmp/hadoop-unjar5713376120282440056/] [] /tmp/streamjob6497965494106718597.jar tmpDir=null
18/10/07 22:17:33 INFO client.RMProxy: Connecting to ResourceManager at master201/192.168.152.201:8032
18/10/07 22:17:34 INFO client.RMProxy: Connecting to ResourceManager at master201/192.168.152.201:8032
18/10/07 22:17:34 INFO mapred.FileInputFormat: Total input paths to process : 1
18/10/07 22:17:34 INFO mapreduce.JobSubmitter: number of splits:2
18/10/07 22:17:35 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1538880566647_0003
18/10/07 22:17:35 INFO impl.YarnClientImpl: Submitted application application_1538880566647_0003
18/10/07 22:17:35 INFO mapreduce.Job: The url to track the job: http://master201:8088/proxy/application_1538880566647_0003/
18/10/07 22:17:35 INFO mapreduce.Job: Running job: job_1538880566647_0003
18/10/07 22:17:45 INFO mapreduce.Job: Job job_1538880566647_0003 running in uber mode : false
18/10/07 22:17:45 INFO mapreduce.Job: map 0% reduce 0%
18/10/07 22:17:49 INFO mapreduce.Job: map 100% reduce 0%
18/10/07 22:17:54 INFO mapreduce.Job: map 100% reduce 100%
18/10/07 22:17:54 INFO mapreduce.Job: Job job_1538880566647_0003 completed successfully
18/10/07 22:17:55 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=1076619
FILE: Number of bytes written=2515154
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=709544
HDFS: Number of bytes written=181530
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=2
Launched reduce tasks=1
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=4383
Total time spent by all reduces in occupied slots (ms)=1729
Total time spent by all map tasks (ms)=4383
Total time spent by all reduce tasks (ms)=1729
Total vcore-milliseconds taken by all map tasks=4383
Total vcore-milliseconds taken by all reduce tasks=1729
Total megabyte-milliseconds taken by all map tasks=4488192
Total megabyte-milliseconds taken by all reduce tasks=1770496
Map-Reduce Framework
Map input records=2866
Map output records=111818
Map output bytes=852977
Map output materialized bytes=1076625
Input split bytes=224
Combine input records=0
Combine output records=0
Reduce input groups=16985
Reduce shuffle bytes=1076625
Reduce input records=111818
Reduce output records=16984
Spilled Records=223636
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=134
CPU time spent (ms)=2790
Physical memory (bytes) snapshot=699613184
Virtual memory (bytes) snapshot=6329569280
Total committed heap usage (bytes)=483393536
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=709320
File Output Format Counters
Bytes Written=181530
18/10/07 22:17:55 INFO streaming.StreamJob: Output directory: /lishijia/output/mapreduce/

总结

以上通过Python编写的map、reduce在Hadoop Streaming集群上执行统计一个文件中一个单词出现的次数,算是Hadoop 核心组件之一MapReduce的入门篇吧。

代码:https://github.com/lishijia/py-data-demo/tree/master/mapreduce

Hadoop2.7.2集群安装

分享到 评论