专业编程基础技术教程

网站首页 > 基础教程 正文

画像笔记21- (标签挖掘过程中)开发性能调优

ccvgpt 2024-08-04 12:24:47 基础教程 33 ℃

感觉下一个阶段学习的重点会是 spark rdd + scala 相关的编程(spark sql + scala)。

这章节原书内容比较简洁,还是要有一定的功底才能看懂。

画像笔记21- (标签挖掘过程中)开发性能调优

---------------------------------

开发性能调优

关于Spark 开发调优及Hive SQL脚本调优的书籍和博客已经有很多了,本章将侧重讲解在开发画像过程中可能遇到的一些共性问题,及对应的解决方案。

数据倾斜调优

数据倾斜是开发画像过程中常遇到的问题,当任务执行一直卡在map 100%、reduce 99%,最后的1% 花了几个小时都没执行完时,这时一般是遇到了数据倾斜。

问题出现的原因是当进行分布式计算时,由于某些节点需要计算的数据较多,导致其他节点的reduce阶段任务执行完成时,该节点的任务还没有执行完成,造成其他节点等待该节点执行完成的情况。

比如两条大表在join的时候大部分key对应10条数据,但是个别几个key 对应了100万条数据,对应10条数据的task很快执行完成了,但对应了100万数据的key则要执行几个小时。

图5-1 所示的是一个典型的例子。


bb这个key在3个节点上有11条数据,aa 和 cc在3个节点上分别由2条和1条数据,这些数据都会被拉取到task上处理。处理bb 这个task 的运行时间可能是处理aa和 cc的task的运行时间数倍,整体运行速度由最慢的task决定。

下面介绍两种解决数据倾斜问题的方案。

方案一:过滤掉倾斜数据

当少量key重复次数特别多,如果这种key 不是业务需要的key,可以直接过滤掉。这里有一张埋点日志表ods.page_event_log,需要和订单表dw.order_info_fact做join关联。

在执行Hive的过程中发现任务卡在map 100%、reduce 99%,最后的1% 一直运行不完。考虑应该是在join的过程中出现了数据倾斜,下面进行排查。

对于 ods.page_event_log 表查看出现次数最多的key:


将key 按出现次数从多到少排序(下图所示)


同样地,对订单表dw.order_info_fact 查看出现次数最多的key:


日志表key 按出现次数倒排序,下图(5-3)所示


从上面的例子可以看出,日志表和订单表通过cookieid进行join,当cookieid为0的时候,join操作将会产生142286*142286条数据,数据量如此庞大的节点系统无法处理过来。

同样当cookieid 为null 值和空值时也会出现这种情况,而且cookieid为这三个值时并没有实际的业务意义。因此在对两个表做关联时,排除掉这3个值以后,就可以很快的计算出结果了。

-----------------------------------------

注:

distribute by 和 sort by的作用:

https://blog.csdn.net/bitcarmanlee/article/details/51694616

上面的文章讲的比较好,简单来说,sort by 是局部排序,这样可以做到把一个大的reduce 任务发到多个上并行执行提高效率。而distribute by,让相同的 col 的值放到同一个reduce 里面。

根据执行计划很容易看出:相对于order by的一个job,sort by起了有两个job。第一个job现在每个reduce内部做局部排序,取top10。假设job1起了M个reduce,则第二个job再对M个reduce的输出做排序,

但此时输入的数据量只有M*10条,最后取前10条,就得到了我们要的top10。这样与order by的全局排序相比,如果数据量很大的话,效率将大大提高。

两个语句对比,很容易看出,加上distribute by以后,distribute by后面的col相同的值都被分配到了同一个reduce里。

版权声明:本文为CSDN博主「bitcarmanlee」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://blog.csdn.net/bitcarmanlee/article/details/51694616

————————————————

方案二:引入随机数

数据按照类型进行group by时,会将相同的key 所需的数据拉取到一个节点进行聚合,而当某组数据量过大时,会出现其他组已经计算完成而当前任务未完成的情况。

可以考虑加入随机数,将原来的一组key 强制拆分为多组进行聚合。下面通过一个案例进行介绍。

现需要统计用户的订单量,执行如下代码:


用户维度表(当天的用户status_id 为1的用户)有2000万条数据, 订单表(用户添加购物车、收藏购物车、购买了货物 且地址在600,900 之间的用户,基于用户id进行排序)有10亿条数据。

任务在未优化前执行了1个小时也没有跑出结果,判断可能是出现了数据倾斜。

订单表中某些key值数量较多,在group by的过程中拉取到一个task上执行时,会出现其它task 执行完毕,等待该task 执行的情况。这里可以将原本相同的key 通过添加随机前缀的方式变成多个key,这样将原本一个task 处理的key 分散到多个task 上先做一次聚合,然后去掉前缀再进行一次聚合得到最终结果。

如下图5-4 所示:


修改后代码执行如下:


comment:

差别的是第二个SQL循环多了一个内循环。

where pay_status in (1,3) 选择 pay_status 为1,3的订单事实表 group by user_id,round(rand()*1000),根据用户id及round 值做分组。

相比之前:

where site_id in (600,900) 地址在600 或 900 ,and order_status_id in (1,2,3) 而且订单状态在1,2,3,收藏,购买,购物车里 三个状态。

前面的SQL内循环,分组的时候(group by) ,根据两个元素进行分组做得更细。(userid+round(rand()*1000).

这里round(rand()*1000) 表示有1000个分组了。

相当于拆分1000个reduce ,然后再做第二次合并。

---------------------------------------------------------

Hive SQL里面 round(rand()*1000) 是什么含义?

参考https://blog.csdn.net/qq_35835118/article/details/99735944

rand() 返回一个0到1之间double 类型的随机值:

round() 浮点数四舍五入 :


round(1.4) : 1

round(1.5) : 2


5.2 合并小文件

在Spark 执行 "insert overwrite table 表名”的语句时,由于多线程并行向HDFS写入且RDD默认分区为200个,因此默认情况下会产生200个小文件。

Spark 中可以使用reparation 或 coalesce 对RDD的分区重新进行划分,reparation 是coalesce 接口中shuffle为true的实现。

在spark内部会·对每一个分区分配一个task 执行,如果task 过多,那么每个task处理的数据量很小,这就会造成线程频繁在task之间切换,导致集群工作效率低下。为解决这个问题,常用RDD重分区函数来减少分区数量,将小分区合并为大分区,从而提高集群工作效率。


-----------------------------------

关于spark sql 里面的函数 coalesce(1)

参考:

https://www.cnblogs.com/fillPv/p/5392186.html

这两个方法有什么区别,看看源码就知道了:coalesce只能减少分区,而repartition可以减少和增加

Spark Rdd coalesce()方法和repartition()方法

在Spark的Rdd中,Rdd是分区的。

有时候需要重新设置Rdd的分区数量,比如Rdd的分区中,Rdd分区比较多,但是每个Rdd的数据量比较小,需要设置一个比较合理的分区。或者需要把Rdd的分区数量调大。还有就是通过设置一个Rdd的分区来达到设置生成的文件的数量。

有两种方法是可以重设Rdd的分区:分别是 coalesce()方法和repartition()。

def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)

: RDD[T] = withScope {

if (shuffle) {

/** Distributes elements evenly across output partitions, starting from a random partition. */

val distributePartition = (index: Int, items: Iterator[T]) => {

var position = (new Random(index)).nextInt(numPartitions)

items.map { t =>

// Note that the hash code of the key will just be the key itself. The HashPartitioner

// will mod it with the number of total partitions.

position = position + 1

(position, t)

}

} : Iterator[(Int, T)]


// include a shuffle step so that our upstream tasks are still distributed

new CoalescedRDD(

new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),

new HashPartitioner(numPartitions)),

numPartitions).values

} else {

new CoalescedRDD(this, numPartitions)

}

}

-------------------------------------------

关于 stripMargin 含义:

解决该问题的方法就是应用scala的stripMargin方法,在scala中stripMargin默认是“|”作为出来连接符,在多行换行的行头前面加一个“|”符号即可。

代码实例:

val speech = """abc

|def""".stripMargin

运行的结果为:

abc

ldef

\\\

-----------------------------------------

Spark RDD 的含义:

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。

RDD,英文全称叫 Resilient Distributed Datasets。an RDD is a read-only, partitioned collection of records. 字面意思是只读的分布式数据集。

RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。

但其实个人觉得可以把 RDD 理解为关系数据库里的一个个操作,比如 map,filter,Join 等。在 Spark 里面实现了许多这样的 RDD 类,即可以看成是操作类。

还有一个区别就是,RDD 计算后的中间结果是可以被持久化,当下一次需要使用时,可以直接使用之前持久化好的结果,而不是重新计算,

并且这些结果被存储在各个结点的 executor 上。下一次使用时,调度器可以直接把 task 分发到存储持久化数据的结点上,减少数据的网络传输开稍。这种场景在数据挖掘迭代计算是经常出现。

val links = spark.textFile(…).map(…).persist() var ranks = // RDD of (URL, rank) pairs

for (i <- 1 to ITERATIONS) {

// Build an RDD of (targetURL, float) pairs // with the contributions sent by each page val contribs = links.join(ranks).flatMap {

(url, (links, rank)) =>

links.map(dest => (dest, rank/links.size)) }

// Sum contributions by URL and get new ranks

ranks = contribs.reduceByKey((x,y) => x+y)

.mapValues(sum => a/N + (1-a)*sum) }

以上代码生成的 RDD 执行树如下图所示:


参考:

https://www.cnblogs.com/qingyunzong/p/8899715.html

参考:

http://lxw1234.com/archives/2016/05/666.htm

--------------------------------------

Tags:

最近发表
标签列表