python 实现Hadoop的partitioner和二次排序

论坛 期权论坛 脚本     
匿名技术用户   2020-12-29 07:38   32   0

我们有以下应用场景:

a.txt,b.txt两个文件,其中,由于两个文件的数据量非常庞大,导致我们使用一个reduce任务搞不定,因此需要设定为2个reduce任务。

但业务部提出的要求是要对两个文件进行排序。这样的话,对于多reduce任务,使用原有的排序方式是不可用的。因此,

我们进行对Partitioner二次排序。

我们采用python 来实现mapreduce :

map.py:

#!/usr/bin/python
import sys


base_count = 10000
for line in sys.stdin:
key,val = line.strip().split('\t')
new_key=base_count + int(key)
red_idx=1
if new_key < (10100+10000)/2:
red_idx = 0
print "%s\t%s\t%s" % (red_idx,new_key,val)

red.py:

#!/usr/bin/python
import sys
base_value=10000
for line in sys.stdin:
red_idx,key,val = line.strip().split('\t')
print "%s\t%s\t%s" % red_idx,(str(int(key) - base_value),val)



使用shell脚本调用我们的map,reduce:

#!/usr/bin/bash
HADOOP_CMD="/usr/java/hadoop/bin/hadoop"
STREAM_JAR_PATH="/usr/java/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.2.0.
jar"
INPUT_FILE_PATH_1="/user/hadoop/a.txt"
INPUT_FILE_PATH_2="/user/hadoop/b.txt"
OUTPUT_PATH="/user/hadoop/output"
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_1,$INPUT_FILE_PATH_2 \
-output $OUTPUT_PATH \
-mapper "python map.py " \
-reducer "python red.py " \
-jobconf "mapred.reduce.tasks=2" \
-jobconf "stream.num.map.output.key.fields=2" \
-jobconf "num.key.fields.for.partition=1" \
-jobconf "mapred.job.name=sortfile_demo" \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-file ./map.py \
-file ./red.py \


这里为了保证二次排序,需要设定如下参数:

-jobconf "mapred.reduce.tasks=2" ###设定reduce的任务数量
-jobconf "stream.num.map.output.key.fields=2" ####设定map输出的前两个字段作为key
-jobconf "num.key.fields.for.partition=1" ####在设定的key的字段中,我们把第一个字段作为partition
-jobconf "mapred.job.name=sortfile_demo" ####设定job的名字
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner ###引入包





分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:7942463
帖子:1588486
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP