本文来自:http://blog.csdn.net/liangyihuai/article/details/54925737
本文使用了两种方法进行spark 的top k词频查询,第一种方法在很多博客中都介绍到了的,但是这种方法有一个问题,那就是在大数据的情况下效率不高,因为它是通过sparkAPI中的top方法来计算的,这个过程会引起一个耗时的“洗牌“过程;第二种方法在其他博客中基本没有看到,使用的是堆的方式,具体为采用immutable.TreeMap这个自带排序功能的类,但是需要我们稍微修改一下,让它能够根据value的大小而不是key来排序。这个方法没有“洗牌“但有一个汇集数据的过程,但是这个动作所涉及到的数据量是比较小的(每个分区的k个数据),而且它只是汇聚数据,而不是耗时的shuffle,所以这个方法在效率优于第一种方法
如何让immutable.TreeMap根据value值的大小自动排序呢?我们知道treemap默认是根据key值的大小自动排序的。为了实现这个功能,另外增加了一个辅助的数据结构:mutable.HashMap,该hashmap用于在覆写Ordering的compare方法的时候通过key值来查询出相应的value,通过比较value来构造compare方法的返回值(正数、负数或者0)。这里需要注意的是,这里的compare方法不能返回0,因为返回0的话treemap中具有相同value的数据不能同时存在,即使key是不用的,具体原因跟scala的treemap功能实现有关,希望读者能动手验证一下。
具体的spark环境搭建和代码提交过程本文从略。有问题可以留言。
import org.apache.hadoop.fs.Path
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory
import scala.collection.immutable.TreeMap
import scala.collection.mutable
/**
* 统计文本中词频最高的k个单词
* 使用两种方法:第一种方法使用排序的方式,大数据的情况效率不高;第二种方法使用堆排序的方式,适合大数据的方式。
* 下面的指令是本人提交代码用到的:
/home/liangyh/installed/spark/bin/spark-submit \
--master local \
--class TopK \
/home/liangyh/IdeaProjects/SparkTest/out/artifacts/SparkTest_jar/SparkTest.jar
* Created by liangyh on 12/18/16.
*/
object TopK {
private val logger = LoggerFactory.getLogger(getClass.getName)
val inputFileLocation:String = "hdfs://Master:9000/user/liangyh/input.txt";
val outputFileLocation:String = "hdfs://Master:9000/user/liangyh/output";
def main(args: Array[String]): Unit = {
logger.info("---------------start---------")
val sparkConf = new SparkConf().setAppName("TopK");
sparkConf.setMaster("spark://Master:7077");
val sc = new SparkContext(sparkConf)
// val hdfs = org.apache.hadoop.fs.FileSystem.get(
// new java.net.URI("hdfs://127.0.0.1:9000"), new org.apache.hadoop.conf.Configuration())
// val path = new Path(this.outputFileLocation)
// if(hdfs.exists(path)) hdfs.delete(path, true)
logger.info("--------------read file----------------")
val lines = sc.parallelize(List("aa a aa b bb b d b e f g h aa a a a d g "),2)
doTopK1(lines)
logger.info("---------done top k--------------")
sc.stop
}
/**
* 第一种top k方式
* @param lines
*/
def doTopK1(lines:RDD[String]):Unit = {
//计算每一个单词的词频
val wordCountRDD = lines.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_+_)
//排序
val sorted = wordCountRDD.map{case(key,value) => (value,key)}.sortByKey(true,3)
//得到词频最高的4个单词
val topk = sorted.top(4)
//print
topk.foreach(println)
}
/**
* 第二种topk方式
* @param lines
*/
def doTopK2(lines:RDD[String]):Unit = {
//计算每个单词的词频
val wordCountRDD:RDD[(String,Int)] = lines.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_+_)
//在每一个分区内进行top k查询
val topK= wordCountRDD.mapPartitions(iter => {
val partitionHeap = new Heap()
while(iter.hasNext){
partitionHeap.putToHeap(iter.next())
}
partitionHeap.getHeap().iterator
})
val driverHeap = new Heap()
//将每个分区中统计出来的top k合并成一个新的集合,再统计新集合中的top k。
topK.collect().foreach(driverHeap.putToHeap(_))
driverHeap.getHeap().foreach(next => logger.info("------"+next._1+"->"+next._2+"-----"))
}
}
/**
* 一个能够根据treeMap的value大小降序排序的堆。
* @param k 保留前面k个数
*/
class Heap(k:Int = 4){
/**
* 辅助数据结构,加快查找速度
*/
private val hashMap:mutable.Map[String,Int] = new mutable.HashMap[String, Int]()
implicit val valueOrdering = new Ordering[String]{
override def compare(x: String, y: String): Int = {
val xValue:Int = if(hashMap.contains(x)) hashMap.get(x).get else 0
val yValue:Int = if(hashMap.contains(y)) hashMap.get(y).get else 0
if(xValue > yValue) -1 else 1
}
}
/**
*存储有序数据
*/
private var treeMap = new TreeMap[String, Int]()
/**
* 把数据存入堆中
* 自动截取,只保留前面k个数据
* @param word
*/
def putToHeap(word:(String,Int)):Unit = {
hashMap += (word._1 -> word._2)
treeMap = treeMap + (word._1 -> word._2)
val dropItem = treeMap.drop(k)
dropItem.foreach(treeMap -= _._1)
treeMap = treeMap.take(this.k)
}
/**
* 取出堆中的数据
* @return
*/
def getHeap():Array[(String,Int)] = {
val result = new Array[(String, Int)](treeMap.size)
var i = 0
this.treeMap.foreach(item => {
result(i) = (item._1, item._2)
i += 1
})
result
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
贴一下结果:
17/02/08 12:32:55 INFO TopK$: ==collect ==> aa-3
17/02/08 12:32:55 INFO TopK$: ==collect ==> b-3
17/02/08 12:32:55 INFO TopK$: ==collect ==> d-2
17/02/08 12:32:55 INFO TopK$: ==collect ==> h-1
17/02/08 12:32:55 INFO TopK$: ==collect ==> a-4
17/02/08 12:32:55 INFO TopK$: ==collect ==> g-2
17/02/08 12:32:55 INFO TopK$: ==collect ==> e-1
17/02/08 12:32:55 INFO TopK$: ------a->4-----
17/02/08 12:32:55 INFO TopK$: ------aa->3-----
17/02/08 12:32:55 INFO TopK$: ------b->3-----
17/02/08 12:32:55 INFO TopK$: ------d->2-----