DistributedCache的使用

论坛 期权论坛 脚本     
匿名网站用户   2020-12-20 10:35   11   0

MapReduce Job中使用全局共享数据是不可避免的,但是根据该框架的特点,在代码级别实现全局变量是不现实的,主要是因为继承Mapper基类的Map阶段类和继承Reducer基类的Reduce阶段类的运行时独立的,并不像代码看起来的那样会共享一个Java虚拟机的资源。常用的在MapReduce编程中设置全局共享数据的方法主要有:

1.读写HDFS文件

数据文件放在HDFS上,Map task和Reduce task都可以进行读写HDFS预定好的同一个文件来实现全局共享数据。它是通过Java API来操作,但是该方法的缺点是:容易产生写冲突,覆盖原有数据。共享一些很小的全局数据需要使用I/O,这将占用系统资源。

2.配置Job属性

在执行MapReduce执行过程中,task可以读取Job的属性。可以在任务之初利用Configuration类的set方法将一些简单的全局数据封装到作业的配置属性中,然后在map task 或者reduce task中需要时利用Configuration类中get方法获取配置到属性的全局数据。这种方法的优点是简单,资源消耗小,但是对量比较大的共享数据显得比较无力。

3.使用DistributedCache

DistributedCache是MapReduce为应用提供缓存文件的只读工具,它可以缓存文本文件、压缩文件和jar文件等。在使用时,用户可以在作业配置时使用本地或HDFS文件的URL来将其设置成共享缓存文件。在作业启动之前,MapReduce框架会将可能需要的缓存文件复制到执行任务节点的本地。该方法的优点是每个Job共享文件只会在启动之后复制一次,并且它适用于大量的共享数据;而缺点是只读的。

下面是使用DistributedCache的步骤:

1)将要缓存的文件复制到HDFS上

bin/hadoop fs -copyFromLocal lookup /myapp/lookup

2)启用作业的属性配置,并设置待缓存文件

Configuration conf = new Configuration();

DistributedCache.addCacheFile(new URI("/myapp/lookup #lookup"),conf); #lookup你可以在程序中直接使用lookup来访问lookup这个文件。lookup是一个符号链接文件。

3 ) 在Map函数中使用DistributedCache

Configuration conf = context.getConfiguration();

Path[] localArchives = DistributedCache.getLocalCacheArchives(conf);

Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);

贴上一个源码:

package cn.swpu;

import java.io.IOException;
import java.net.URI;
import java.util.Scanner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.*;


public class Driver {
 private static Logger logger = LoggerFactory.getLogger(Driver.class);

 public static class TokenMapper extends Mapper<Object,Text,Text,IntWritable> 
 {
  Path[] path = null;
  /**
     * Called once at the beginning of the task.
  */
    protected void setup(Context context ) throws IOException, InterruptedException 
    {
      logger.info("map setup");
      Configuration conf = context.getConfiguration();
      path = DistributedCache.getLocalCacheFiles(conf);
      System.out.println("the path[0] is:"+path[0].toString());
      FileSystem fsopen = FileSystem.getLocal(conf);
      FSDataInputStream in = fsopen.open(path[0]);
      Scanner scan = new Scanner(in);
      while(scan.hasNext())
      {
       System.out.println(Thread.currentThread().getName()+"the content is:"+scan.next());       
      }
      scan.close();
    }

     public  void map(Object key,Text value,Context context) throws IOException,InterruptedException
     {
      logger.info("map output");
      context.write(new Text(""), new IntWritable(0));
     }
     /**
     * Called once at the end of the task.
  */    
     protected void cleanup(Context context) throws IOException, InterruptedException 
     {
             logger.info("cleanup begin");
        }
     
     /**
      * Expert users can override this method for more complete control over the
      * execution of the Mapper.
      * @param context
      * @throws IOException
      */
     public void run(Context context) throws IOException, InterruptedException {
         setup(context);
         while (context.nextKeyValue()) {
           map(context.getCurrentKey(), context.getCurrentValue(), context);
         }
         cleanup(context);
       }
 }
 
 public static class  IntSumReduce extends Reducer<Text,IntWritable,Text,IntWritable>
 {
  
  protected void setup(Context context) throws IOException, InterruptedException
  {
   logger.info("reduce setup");
  }
  
  public void reduce(Text key,Iterable<IntWritable> values,Context context) 
    throws IOException,InterruptedException
  {
            System.out.println("reduce function");
     }
  
  protected void cleanup(Context context) throws IOException,InterruptedException
  {
   logger.info("cleanup reduce");
  }
  
  public void run(Context context) throws IOException,InterruptedException
  {
   setup(context);
   while(context.nextKey())
   {
    reduce(context.getCurrentKey(),context.getValues(),context);
   }
   cleanup(context);
  }
  
 }
 
   public static void main(String[] args) throws Exception
   {
    Configuration conf = new Configuration();
    
      String inputPath = "hdfs://master:9000/user/hadoop/input/";
      String outputPath = "hdfs://master:9000/user/hadoop/DistributedCache";
      
       Job job  = new Job(conf,"a");
       
       DistributedCache.addCacheFile(new URI("hdfs://master:9000/user/hadoop/input/a.txt"),conf);
       
       job.setJarByClass(Driver.class);
       System.out.println("run patten:"+conf.get("mapred.job.tracker"));
       FileSystem fs = FileSystem.get(job.getConfiguration());
       Path pout = new Path(outputPath);
       if(fs.exists(pout))
       {
        fs.delete(pout,true);
        System.out.println("the path exists,has been deleted!");
        
       }
       job.setMapOutputKeyClass(Text.class);
       job.setMapOutputValueClass(IntWritable.class);
       job.setMapperClass(TokenMapper.class);
       job.setReducerClass(IntSumReduce.class);
       FileInputFormat.setInputPaths(job, new Path(inputPath));
       FileOutputFormat.setOutputPath(job,new Path(outputPath));
       System.exit(job.waitForCompletion(true)?0:1);
   }


}


缓存的文件在DataNode的hadoop.tmp.dir这个属性的值下面,调试的信息也在该目录对应的文件夹下面。




分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:1136255
帖子:227251
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP