|
这里用DistributedCache(分布式缓存)来解决join算法实现中的数据倾斜问题,同样考虑那篇博文的两张表,订单表和产品表(需求就是根据外键商品id来将两张表信息合并)。
订单号 时间 商品id 购买数量
1001,20170710,P0001,1
1002,20170710,P0001,3
1003,20170710,P0002,3
1004,20170710,P0002,4
商品id 商品名称
P0001,xiaomi
P0002,huawei
拼接成
1001 ,20170710,P0001,1 xiaomi
1002,20170710,P0001,3 xiaomi
1003,20170710,P0002,3,huawei
1004,20170710,P0002,4,huawei
考虑问题:在前面博文的mapreduce程序中,如果某些产品非常畅销,肯定会产生很多订单,但是刚好这些订单信息都传到了一个reduce中(分区默认就是使用hashcode%reducetask数量,所以这种情况是正常的)。那么这个reducetask压力就很大了,而其他的reducetask处理的信息就很小,有的甚至就处理几条数据,这就出现了数据倾斜问题。
解决方案:一般来说订单表的数据远远多于产品表数据,毕竟产品的种类就那些,所以我们可以把产品信息都交给maptask就行了逻辑都让maptask来处理,也就是说不使用reduce了,而让每个maptask持有个product.data(存储产品信息的文件)即可。那么maptask怎么获得这个文件呢?刚好hadoop提供了DistributedCache,我们将文件交给这个分布式缓存,它会将我们的文件放到maptask的工作目录中,那么map端可以直接从工作目录中去拿。
实现:
public class MapJoin {
static class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
Map<String, String> produceMap = new HashMap<String,String>();
Text k = new Text();
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("product.data")));
String line = null;
while((line=br.readLine())!=null){
String[] fields = line.split(",");
produceMap.put(fields[0], fields[1]);
}
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(",");
String productName = produceMap.get(fields[2]);
k.set(line+"\t"+productName);
context.write(k, NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(MapJoin.class);
job.setMapperClass(MapJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.addCacheFile(new URI("hdfs://192.168.25.127:9000/mapjoincache/product.data"));
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean ex = job.waitForCompletion(true);
System.exit(ex?0:1);
}
}
测试:
将工程打包上传到集群中去
创建两个订单文件和产品文件(orders.data和product.data,内容就是上面的内容)
创建输入目录/mapjoincache/input,将orders.data传上去
将产品文件放到/mapjoincache下。
运行程序
[root@mini3 ~]
1001,20170710,P0001,1
1002,20170710,P0001,3
1003,20170710,P0002,3
1004,20170710,P0002,4
[root@mini3 ~]
P0001,xiaomi
P0002,huawei
[root@mini3 ~]
[root@mini3 ~]
[root@mini3 ~]
[root@mini3 ~]
[root@mini3 ~]
Found 2 items
-rw-r--r-- 2 root supergroup 0 2017-10-16 21:16 /mapjoincache/output/_SUCCESS
-rw-r--r-- 2 root supergroup 116 2017-10-16 21:16 /mapjoincache/output/part-m-00000
[root@mini3 ~]
1001,20170710,P0001,1 xiaomi
1002,20170710,P0001,3 xiaomi
1003,20170710,P0002,3 huawei
1004,20170710,P0002,4 huawei
|