MapReduce功能实现四---小综合(从hbase中读取数据统计并在hdfs中降序输出Top 3)

论坛 期权论坛 脚本     
匿名技术用户   2020-12-27 02:28   32   0

MapReduce功能实现系列:

MapReduce功能实现一---Hbase和Hdfs之间数据相互转换

MapReduce功能实现二---排序

MapReduce功能实现三---Top N

MapReduce功能实现四---小综合(从hbase中读取数据统计并在hdfs中降序输出Top 3)

MapReduce功能实现五---去重(Distinct)、计数(Count)

MapReduce功能实现六---最大值(Max)、求和(Sum)、平均值(Avg)

MapReduce功能实现七---小综合(多个job串行处理计算平均值)

MapReduce功能实现八---分区(Partition)

MapReduce功能实现九---Pv、Uv

MapReduce功能实现十---倒排索引(Inverted Index)

MapReduce功能实现十一---join


方法一:

在Hbase中建立相应的表1:

create 'hello','cf'
put 'hello','1','cf:hui','hello world'
put 'hello','2','cf:hui','hello hadoop'
put 'hello','3','cf:hui','hello hive'
put 'hello','4','cf:hui','hello hadoop'
put 'hello','5','cf:hui','hello world'
put 'hello','6','cf:hui','hello world'
put 'hello','7','cf:hui','hbase hive'

java代码:

import java.io.IOException;
import java.util.Comparator;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class HbaseTopJiang1 {  
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  
     String tablename = "hello";
     Configuration conf = HBaseConfiguration.create();
     conf.set("hbase.zookeeper.quorum", "h71");
     Job job = new Job(conf, "WordCountHbaseReader");
     job.setJarByClass(HbaseTopJiang1.class);
     Scan scan = new Scan();
     TableMapReduceUtil.initTableMapperJob(tablename,scan,doMapper.class, Text.class, IntWritable.class, job);
     job.setReducerClass(WordCountHbaseReaderReduce.class);
     FileOutputFormat.setOutputPath(job, new Path(args[0]));
     MultipleOutputs.addNamedOutput(job, "hdfs", TextOutputFormat.class, WritableComparable.class, Writable.class);
     System.exit(job.waitForCompletion(true) ? 0 : 1);
    }  
      
    public static class doMapper extends TableMapper<Text, IntWritable>{  
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text(); 
        @Override  
        protected void map(ImmutableBytesWritable key, Result value,  
                Context context) throws IOException, InterruptedException { 
         /*不进行分隔,将value整行全部获取
   String rowValue = Bytes.toString(value.list().get(0).getValue());
           context.write(new Text(rowValue), one);
         */
         String[] rowValue = Bytes.toString(value.list().get(0).getValue()).split(" ");
          for (String str: rowValue){
         word.set(str);
         context.write(word,one);
         }
        }  
    }  
    
    public static final int K = 3; 
    public static class WordCountHbaseReaderReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
        //定义treeMap来保持统计结果,由于treeMap是按key升序排列的,这里要人为指定Comparator以实现倒排
        private TreeMap<Integer, String> treeMap = new TreeMap<Integer, String>(new Comparator<Integer>() {
            @Override
            public int compare(Integer x, Integer y) {
                return y.compareTo(x);
            }
        });
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            //reduce后的结果放入treeMap,而不是向context中记入结果
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            if (treeMap.containsKey(sum)){
                String value = treeMap.get(sum) + "," + key.toString();
                treeMap.put(sum,value);
            }else {
                treeMap.put(sum, key.toString());
            }
   if(treeMap.size() > K) {
    treeMap.remove(treeMap.lastKey());
   }  
        }
        protected void cleanup(Context context) throws IOException, InterruptedException {
            //将treeMap中的结果,按value-key顺序写入contex中
            for (Integer key : treeMap.keySet()) {
                context.write(new Text(treeMap.get(key)), new IntWritable(key));
            }
        }
    }
}  

在Linux中执行该代码:
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac HbaseTopJiang1.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HbaseTopJiang1*class
[hadoop@h71 q1]$ hadoop jar xx.jar HbaseTopJiang1 /out

[hadoop@h71 q1]$ hadoop fs -cat /out/part-r-00000
hello 6
world 3
hadoop,hive 2


方法二:

truncate 'hello'
put 'hello','1','cf:hui','hello world world'
put 'hello','2','cf:hui','hello hadoop hadoop'
put 'hello','3','cf:hui','hello hive hive'
put 'hello','4','cf:hui','hello hadoop hadoop'
put 'hello','5','cf:hui','hello world world'
put 'hello','6','cf:hui','hello world world'
put 'hello','7','cf:hui','hbase hive hive'
注意:相同单词之间的分隔符是"/t"(Tab键),结果hbase中插入数据的时候根本就不能插入制表符,所以该方法破产,可以参考一下思想

java代码:

import java.io.IOException;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class HbaseTopJiang2{
 public static class doMapper extends TableMapper<Text, IntWritable>{  
  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text(); 
  @Override  
  protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { 
         /*不进行分隔,将value整行全部获取
   String rowValue = Bytes.toString(value.list().get(0).getValue());
           context.write(new Text(rowValue), one);
         */
   String[] rowValue = Bytes.toString(value.list().get(0).getValue()).split(" ");
   for (String str: rowValue){
    word.set(str);
    context.write(word,one);
   }
  }  
 }   

 public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
 @Override
 public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
  int total=0;
  for (IntWritable val : values){
   total++;
  }
  context.write(key, new IntWritable(total));
  }   
 }
    
 public static final int K = 3;  
 /**
 * 把上一个mapreduce的结果的key和value颠倒,调到后就可以按照key排序了。
 */
 public static class KMap extends Mapper<LongWritable,Text,IntWritable,Text> {
  TreeMap<Integer, String> map = new TreeMap<Integer, String>();
  @Override
  public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
   String line = value.toString();
   String result[] = line.split("\t");
   StringBuffer hui = null;
   if(result.length > 2){ //我怕在往hbase表输入数据时带\t分隔符的,后来发现hbase中插入数据的时候根本就不能插入制表符
    for(int i=0;i<result.length-2;i++){
     hui=new StringBuffer().append(result[i]);
    }
   }else{
    hui = new StringBuffer().append(result[0]);
   }
   if(line.trim().length() > 0 && line.indexOf("\t") != -1) {
    String[] arr = line.split("\t", 2);
    String name = arr[0];  
    Integer num = Integer.parseInt(arr[1]);  
          if (map.containsKey(num)){
              String value1 = map.get(num) + "," + hui;
              map.put(num,value1);
          }
          else {
              map.put(num, hui.toString());
          }
    if(map.size() > K) {
     map.remove(map.firstKey());  
    }  
   }  
  }  
  @Override
  protected void cleanup(Mapper<LongWritable, Text, IntWritable, Text>.Context context)  
    throws IOException, InterruptedException {  
   for(Integer num : map.keySet()) {  
    context.write(new IntWritable(num), new Text(map.get(num)));
   }  
  }  
 } 
 
 /**
 * 按照key的大小来划分区间,当然,key是int值
 */
 public static class KeySectionPartitioner<K, V> extends Partitioner<K, V> {
  @Override
  public int getPartition(K key, V value, int numReduceTasks) {
   /**
    * int值的hashcode还是自己本身的数值
    */
   //这里我认为大于maxValue的就应该在第一个分区
   int maxValue = 50;
   int keySection = 0;
   // 只有传过来的key值大于maxValue 并且numReduceTasks比如大于1个才需要分区,否则直接返回0
   if (numReduceTasks > 1 && key.hashCode() < maxValue) {
    int sectionValue = maxValue / (numReduceTasks - 1);
    int count = 0;
    while ((key.hashCode() - sectionValue * count) > sectionValue) {
     count++;
    }
    keySection = numReduceTasks - 1 - count;
   }
   return keySection;
  }
 }
      
 /**
 * int的key按照降序排列
 */
 public static class IntKeyDescComparator extends WritableComparator {
  protected IntKeyDescComparator() {
   super(IntWritable.class, true);
  }
  @Override
  public int compare(WritableComparable a, WritableComparable b) {
   return -super.compare(a, b);
  }
 }
      
 /**
 * 把key和value颠倒过来输出
 */
 public static class SortIntValueReduce extends Reducer<IntWritable, Text, Text, IntWritable> {
  private Text result = new Text();
  @Override
  public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
   for (Text val : values) {
    result.set(val.toString());
    context.write(result, key);
   }
  }
 }
  
 public static void main(String[] args) throws Exception {
  String tablename = "hello";
  Configuration conf = HBaseConfiguration.create();
  conf.set("hbase.zookeeper.quorum", "h71");
  Job job1 = new Job(conf, "WordCountHbaseReader");
  job1.setJarByClass(HbaseTopJiang2.class);
  Scan scan = new Scan();
  TableMapReduceUtil.initTableMapperJob(tablename,scan,doMapper.class, Text.class, IntWritable.class, job1);
  job1.setReducerClass(WordCountReducer.class);
  FileOutputFormat.setOutputPath(job1, new Path(args[0]));
  MultipleOutputs.addNamedOutput(job1, "hdfs", TextOutputFormat.class, WritableComparable.class, Writable.class);
       
  Job job2 = Job.getInstance(conf, "Topjiang");
  job2.setJarByClass(HbaseTopJiang2.class);
  job2.setMapperClass(KMap.class);
  job2.setSortComparatorClass(IntKeyDescComparator.class);
  job2.setPartitionerClass(KeySectionPartitioner.class);
  job2.setReducerClass(SortIntValueReduce.class);
  job2.setOutputKeyClass(IntWritable.class);
  job2.setOutputValueClass(Text.class);
  FileInputFormat.setInputPaths(job2, new Path(args[0]));
  FileOutputFormat.setOutputPath(job2, new Path(args[1]));

  //提交job1及job2,并等待完成
  if (job1.waitForCompletion(true)) {
   System.exit(job2.waitForCompletion(true) ? 0 : 1);
  }
 }
}
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac HbaseTopJiang2.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HbaseTopJiang2*class
[hadoop@h71 q1]$ hadoop jar xx.jar HbaseTopJiang2 /out /output


[hadoop@h71 q1]$ hadoop fs -ls /out
-rw-r--r-- 2 hadoop supergroup 0 2017-03-18 19:02 /out/_SUCCESS
-rw-r--r-- 2 hadoop supergroup 32 2017-03-18 19:02 /out/part-r-00000
[hadoop@h71 q1]$ hadoop fs -ls /output
-rw-r--r-- 2 hadoop supergroup 0 2017-03-18 19:02 /output/_SUCCESS
-rw-r--r-- 2 hadoop supergroup 25 2017-03-18 19:02 /output/part-r-00000


理想结果:
[hadoop@h71 q1]$ hadoop fs -cat /out/part-r-00000
hbase 1
hadoop hadoop 2
hello 6
hive hive 2
world world 3
[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000
hello 6
world world 3
hadoop hadoop,hive hive 2
(分隔符都为制表符)


我发现制表符(Tab键)从UltraEdit复制到SecureCRT正常,而从SecureCRT复制到UltraEdit则制表符会变成空格,也是醉了。。。

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:7942463
帖子:1588486
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP