spark 从1.x 转到2.x,编写程序的的一些区别

论坛 期权论坛 脚本     
匿名网站用户   2020-12-19 22:37   11   0

spark 2.x 版本相对于1.x版本,有挺多地方的修改,一是类似于flatMapRDD 中 iteator iteatable之类的区别
2是类似于dataset的一些问题

下面是2.x版本的iteatable和iteartor之类的区别,只举例了两个,其实只要和iteartor有关的都有了修改

flatMap

        JavaRDD<String> flatMapRDD = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                String[] split = s.split("\\s+");
                return Arrays.asList(split).iterator();
            }
        });

flatMapToPair java

        JavaPairRDD<String, Integer> wordPairRDD = lines.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
            @Override
            public Iterator<Tuple2<String, Integer>> call(String s) throws Exception {
                ArrayList<Tuple2<String, Integer>> tpLists = new ArrayList<Tuple2<String, Integer>>();
                String[] split = s.split("\\s+");
                for (int i = 0; i <split.length ; i++) {
                    Tuple2 tp = new Tuple2<String,Integer>(split[i], 1);
                    tpLists.add(tp);
                }
                return tpLists.iterator();
            }
        });

spark中初始化driver的区别

spark2.0中,可以使用session来创建一个sparkContext作为一个新的入口,具体参考例子就可以了

jar包的区别

spark2.x版本中不再有spark-assembly-xxx jar包,jar包全都在.jars 中

scala的版本

spark2.x版本的,对scala的版本最低要求是2.11

下面是sql中的区别

2.x 版本的 sparkSql中
1.x 版本的 DataFrame与Dataset 统一化了,只剩下DataSet了,具体的也可以直接参看官方给的spark sql 的例子即可
具体 todo

iterator and iterable 共用

import java.util.Iterator;

public class MyIterator<T> implements Iterator, Iterable 
{
 private Iterator myIterable;

 public MyIterator(Iterable iterable)
 {
  myIterable = iterable.iterator();
 }

 @Override
 public boolean hasNext() 
 {
  return myIterable.hasNext();
 }

 @Override
 public Object next() 
 {
  return myIterable.next();
 }

 @Override
 public void remove() 
 {
  myIterable.remove();
 }

 @Override
 public Iterator iterator() 
 {
  return myIterable;
 }
}

使用方法

        JavaRDD<String> flatMapRDD = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public MyIterator<String> call(String s) throws Exception {
                String[] split = s.split("\\s+");
                MyIterator myIterator = new MyIterator(Arrays.asList(split));
                return myIterator;
            }
});

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:1136255
帖子:227251
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP