|
网上现有的wordcount一般包括:scala、Java、python版本,且为基于RDD数据格式的方法,该方法较简单,但门槛较高,若采用dataframe的方法,则结果更好理解,但查询函数的过程较为复杂,网上没有找到合适的版本,所以自己写一个,可以采用sparksql的语句进行wordcount的计算。代码如下: from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql.functions import split, explode
spark = SparkSession.builder.appName("wc").getOrCreate()
lines = spark.read.text("q.txt")
words=lines.select(explode(split(lines.value, "\s")).alias("word"))
#lines.select(explode(split(lines.value, "\s")).alias("word")).show()
#values=sorted(words.groupBy(['word']).count().collect())
#print(values)
words.createTempView("ttt");
df = spark.sql("select word, count(word) from ttt group by word ")
df.show(100)
##部分为list效果的展示,后面为创建表的形式来进行sql语句的统计和计算,explode和split函数组合起来可以按空格方式进行切割,并单独保存到一行。运行结果如下: +----------------+-----------+ | word|count(word)| +----------------+-----------+ | Kubernetes,| 1| | complex| 1| | data,| 1| | using| 1| | access| 1| | you| 1| | can| 1| | for| 1| | Mesos,| 1| | SQL.| 1| | Combine| 1| | love| 1| | query| 1| | diverse| 1| | in| 2| | physical| 1| | Write| 1| | both| 1| | streaming| 1| | R,| 1| | cloud.| 1| | SQL,| 1| | Apache| 2| |state-of-the-art| 1| | on| 1| | execution| 1| | achieves| 1| | optimizer,| 1| | engine.| 1| | data| 1| | runs| 1| | batch| 1| | It| 1| | quickly| 1| | Scala,| 1| | the| 1| | high| 1| | Java,| 1| | DAG| 1| | i| 1| | analytics.| 1| | and| 4| | Hadoop,| 1| | applications| 1| | Python,| 1| | performance| 1| | Spark| 2| | a| 3| | standalone,| 1| | scheduler,| 1| | streaming,| 1| | or| 1| | sources.| 1| +----------------+-----------+
|