[root@node1 ~]# spark-submit --master yarn --deploy-mode client --class cn.hadron.AvgAge /root/simpleSpark-1.0-SNAPSHOT.jar input/age.txt
17/09/2210:30:47 WARN NativeCodeLoader: Unable to load native-hadoop libraryfor your platform... using builtin-java classes where applicable
17/09/2210:30:56 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive isset, falling back to uploading libraries under SPARK_HOME.
Total Age:49536760;Number of People:1000000
Average Age is49.53676
[root@node1 ~]#
[root@node1 data]# spark-shell
17/09/2010:12:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/09/2010:13:01 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.017/09/2010:13:02 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
17/09/2010:13:04 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://192.168.80.131:4040
Spark context available as 'sc' (master = local[*], app id = local-1505916766832).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.0
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val rdd1=sc.textFile("input/Hamlet.txt")
rdd1: org.apache.spark.rdd.RDD[String] = input/Hamlet.txt MapPartitionsRDD[1] at textFile at <console>:24
scala> rdd1.count
res0: Long = 6878
scala> val rdd2=rdd1.flatMap(x=>x.split(" ")).filter(_.size>1)
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at filter at <console>:26
scala> rdd2.take(2)
res1: Array[String] = Array(Hamlet, by)
scala> val rdd3=rdd2.map(x=>(x,1))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:28
scala> rdd3.take(2)
res2: Array[(String, Int)] = Array((Hamlet,1), (by,1))
scala> val rdd4=rdd3.reduceByKey(_+_)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[5] at reduceByKey at <console>:30
scala> rdd4.take(3)
res3: Array[(String, Int)] = Array((rises.,1), (Let,35), (lug,1))
scala> val rdd5=rdd4.map{case(x,y)=>(y,x)}
rdd5: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[6] at map at <console>:32
scala> rdd5.take(2)
res4: Array[(Int, String)] = Array((1,rises.), (35,Let))
scala> val rdd6=rdd5.sortByKey(false)
rdd6: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[7] at sortByKey at <console>:34
scala> rdd6.take(2)
res5: Array[(Int, String)] = Array((988,the), (693,and))
scala> val rdd7=rdd6.map{case(a,b)=>(b,a)}
rdd7: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at map at <console>:36
scala> rdd7.take(10)
res6: Array[(String, Int)] = Array((the,988), (and,693), (of,621), (to,604), (my,441), (in,387), (HAMLET,378), (you,356), (is,291), (his,277))
scala> rdd7.take(10).foreach(println)
(the,988)
(and,693)
(of,621)
(to,604)
(my,441)
(in,387)
(HAMLET,378)
(you,356)
(is,291)
(his,277)
scala>
(4)编写完整程序
package cn.hadron
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object TopK {
def main(args: Array[String]): Unit = {
if (args.length < 2) {
println("Usage:TopK KeyWordsFile K");
System.exit(1)
}
val conf = new SparkConf().setAppName("TopK Key Words")
val sc = new SparkContext(conf)
val rdd1 = sc.textFile(args(0))
val result= rdd1.flatMap(x=>x.split(" "))
.filter(_.size>1)
.map(x=>(x,1))
.reduceByKey(_+_)
.map{case(x,y)=>(y,x)}
.sortByKey(false)
.map{case(a,b)=>(b,a)}
result.take(10).foreach(println)
}
}