spark解析CSV文件

论坛 期权论坛 脚本     
匿名技术用户   2021-1-3 05:27   65   0
import java.util


import org.apache.spark.sql.types.{DataTypes, StructField}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}


object TelephoneData13 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("TelephoneData13")
val sc = new SparkContext(conf)
val sQLContext = new SQLContext(sc)
val rdd = sQLContext.read.format("com.databricks.spark.csv")
.option("sep", ",")
.option("header", "true")
.load("F:\\ideaWorkspace\\hello\\data\\yy.csv")
rdd.foreach(x => println(x))
rdd.printSchema()
rdd.show(1000,false)
val temp = rdd.rdd.map(line => {
Row(
line.getString(0).toString,
line.getString(1).toString,
line.getString(2).toString,
line.getString(3).toString,
line.getString(4).toString,
line.getString(5).toString,
line.getString(6).toString,
line.getString(7).toString,
line.getString(8).toString,
line.getString(9).toString,
line.getString(10).toString,
line.getString(11).toString)
})
val fields = new util.ArrayList[StructField]()
fields.add(DataTypes.createStructField("id", DataTypes.StringType, true))
fields.add(DataTypes.createStructField("caseID", DataTypes.StringType, true))
fields.add(DataTypes.createStructField("evID", DataTypes.StringType, true))
fields.add(DataTypes.createStructField("localNum", DataTypes.StringType, true))
fields.add(DataTypes.createStructField("startTime", DataTypes.StringType, true))
fields.add(DataTypes.createStructField("position", DataTypes.StringType, true))
fields.add(DataTypes.createStructField("method", DataTypes.StringType, true))
fields.add(DataTypes.createStructField("dialNumber", DataTypes.StringType, true))
fields.add(DataTypes.createStructField("callDuration", DataTypes.StringType, true))
fields.add(DataTypes.createStructField("starFlag", DataTypes.StringType, true))
fields.add(DataTypes.createStructField("week", DataTypes.StringType, true))
fields.add(DataTypes.createStructField("hours", DataTypes.StringType, true))


// var prop = new Properties()
// prop.setProperty("user", "root")
// prop.setProperty("password", "123456")


val st = DataTypes.createStructType(fields)
val newdf = sQLContext.createDataFrame(temp, st)
// df.registerTempTable("calllistbean_bak")
// val sqlcommand = "select caseID,evID,localNum,startTime,position,method,dialNumber,callDuration,starFlag,week,hours from calllistbean_bak"
// val dataResult = sQLContext.sql(sqlcommand).write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.16.102.180:3306/bdcloud", "calllistbean_bak", prop)
newdf.show()
sc.stop()
}
}
分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:7942463
帖子:1588486
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP