|
转自https://blog.csdn.net/qq_36235275/article/details/82502352
要求运用的滚瓜烂熟
将RDD,DataFrame,DataSet之间进行互相转换
RDD -》 DataFrame
直接手动转换
scala> val people = spark.read.json("/opt/apps/Spark/spark-2.2.2-bin-hadoop2.7/examples/src/main/resources/people.json") people: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> val people1 = sc.textFile("/opt/apps/Spark/spark-2.2.2-bin-hadoop2.7/examples/src/main/resources/people.txt") people1: org.apache.spark.rdd.RDD[String] = /opt/apps/Spark/spark-2.2.2-bin-hadoop2.7/examples/src/main/resources/people.txt MapPartitionsRDD[18] at textFile at <console>:24 scala> val peopleSplit = people1.map{x => val strs = x.split(",");(strs(0),strs(1).trim.toInt)} peopleSplit: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[19] at map at <console>:26 scala> peopleSplit.collect res6: Array[(String, Int)] = Array((Michael,29), (Andy,30), (Justin,19)) scala> peopleSplit.to toDF toDS toDebugString toJavaRDD toLocalIterator toString top scala> peopleSplit.toDF res7: org.apache.spark.sql.DataFrame = [_1: string, _2: int] scala> peopleSplit.toDF("name","age") res8: org.apache.spark.sql.DataFrame = [name: string, age: int] scala> res8.show +-------+---+ | name|age| +-------+---+ |Michael| 29| | Andy| 30| | Justin| 19| +-------+---+ 通过Scala编程实现 ## 创建 schema scala> val schema = StructType(StructField("name",StringType)::StructField("age",IntegerType)::Nil) schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true)) ## 加载RDD数据 scala> val rdd = sc.textFile("/opt/apps/Spark/spark-2.2.2-bin-hadoop2.7/examples/src/main/resources/people.txt") rdd: org.apache.spark.rdd.RDD[String] = /opt/apps/Spark/spark-2.2.2-bin-hadoop2.7/examples/src/main/resources/people.txt MapPartitionsRDD[1] at textFile at <console>:30 ## 创建Row对象 scala> val data = rdd.map{x => val strs = x.split(",");Row(strs(0),strs(1).trim.toInt)} data: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[2] at map at <console>:32 ## 生成DF scala> spark.createDataFrame(data,schema) 18/09/06 09:45:00 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0 18/09/06 09:45:00 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException 18/09/06 09:45:02 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException res0: org.apache.spark.sql.DataFrame = [name: string, age: int] 反射 scala> case class People(name:String,age:Int) defined class People scala> rdd.map{x => val strs=x.split(",");People(strs(0),strs(1).trim.toInt)}.toDF res2: org.apache.spark.sql.DataFrame = [name: string, age: int] DataFrame -》 RDD
scala> res8.rdd res10: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[26] at rdd at <console>:31 RDD -》 DataSet
scala> peopleSplit.toDS res11: org.apache.spark.sql.Dataset[(String, Int)] = [_1: string, _2: int] scala> case class People(name:String,age:Int) defined class People scala> val peopleDSSplit = people1.map{x => val strs = x.split(","); People(strs(0),strs(1).trim.toInt)} peopleDSSplit: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[27] at map at <console>:28 scala> peopleDSSplit.toDS res12: org.apache.spark.sql.Dataset[People] = [name: string, age: int] scala> res12.show +-------+---+ | name|age| +-------+---+ |Michael| 29| | Andy| 30| | Justin| 19| +-------+---+ DataSet -》 RDD
scala> res12.rdd res14: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[32] at rdd at <console>:33 scala> res14.map(_.name).collect res15: Array[String] = Array(Michael, Andy, Justin) DataSet -》 DataFrame
scala> res12.toDF res16: org.apache.spark.sql.DataFrame = [name: string, age: int] DataFrame -》 Datset
scala> res16.as[People] res17: org.apache.spark.sql.Dataset[People] = [name: string, age: int] |