博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark性能调优04-算子调优
阅读量:7211 次
发布时间:2019-06-29

本文共 3783 字,大约阅读时间需要 12 分钟。

1、使用MapPartitions代替map

  1.1 为什么要死使用MapPartitions代替map

    普通的map,每条数据都会传入function中进行计算一次;而是用MapPartitions时,function会一次接受所有partition的数据出入到function中计算一次,性能较高。

    但是如果内存不足时,使用MapPartitions,一次将所有的partition数据传入,可能会发生OOM异常

  1.2 如何使用

    有map的操作的地方,都可以使用MapPartitions进行替换

/**         * 使用mapPartitionsToPair代替mapToPair         */        JavaPairRDD
sessionRowPairRdd =dateRangeRdd.mapPartitionsToPair(new PairFlatMapFunction
, String, Row>() { private static final long serialVersionUID = 1L; public Iterable
> call(Iterator
rows) throws Exception { List
> list=new ArrayList
>(); while (rows.hasNext()) { Row row=rows.next(); list.add(new Tuple2
(row.getString(2), row)); } return list; } }); /*JavaPairRDD
sessionRowPairRdd = dateRangeRdd .mapToPair(new PairFunction
() { private static final long serialVersionUID = 1L; // 先将数据映射为
public Tuple2
call(Row row) throws Exception { return new Tuple2
(row.getString(2), row); } });*/

2、使用coalesce对过滤后的Rdd进行重新分区和压缩

  2.1 为什么使用coalesce

    默认情况下,经过过滤后的数据的分区数和原分区数是一样的,这就导致过滤后各个分区中的数据可能差距很大,在之后的操作中造成数据倾斜

    使用coalesce可以使过滤后的Rdd的分区数减少,并让每个分区中的数据趋于平等

  2.2 如何使用   

//过滤符合要求的ClickCategoryIdRow         filteredSessionRdd.filter(new Function
, Boolean>() { private static final long serialVersionUID = 1L; public Boolean call(Tuple2
tuple2) throws Exception { return (Long.valueOf(tuple2._2.getLong(6))!=null)?true:false; } }) //使用coalesce将过滤后的数据重新分区和压缩,时新的分区中的数据大致相等 .coalesce(100)

3、使用foreachPartition替代foreach

  3.1 为什么使用foreachPartition

    默认使用的foreach,每条数据都会传入function进行计算;如果操作数据库,每条数据都会获取一个数据库连接并发送sql进行保存,消耗资源比较大,性能低。

    使用foreachPartition,会把所用partition的数据一次出入function,只需要获取一次数据库连接,性能高。

  3.2 如何使用

/**         * 使用foreachPartition替代foreach         */        sessionRdd.join(sessionRowPairRdd).foreachPartition(new VoidFunction
>>>() { private static final long serialVersionUID = 1L; public void call(Iterator
>> iterator) throws Exception { List
sessionDetails=new ArrayList
(); if (iterator.hasNext()) { Tuple2
> tuple2=iterator.next(); String sessionId=tuple2._1; Row row=tuple2._2._2; SessionDetail sessionDetail=new SessionDetail(); sessionDetail.setSessionId(sessionId); sessionDetail.setTaskId((int)taskId); sessionDetail.setUserId((int)row.getLong(1)); sessionDetails.add(sessionDetail); } DaoFactory.getSessionDetailDao().batchInsertSessionDao(sessionDetails); } }); /* sessionRdd.join(sessionRowPairRdd).foreach(new VoidFunction
>>() { private static final long serialVersionUID = 1L; public void call(Tuple2
> tuple2) throws Exception { String sessionId=tuple2._1; Row row=tuple2._2._2; SessionDetail sessionDetail=new SessionDetail(); sessionDetail.setSessionId(sessionId); sessionDetail.setTaskId((int)taskId); sessionDetail.setUserId((int)row.getLong(1));   DaoFactory.getSessionDetailDao().insertSessionDao(sessionDetail); } });*/

 4、使用repartition进行调整并行度

  4.1 为什么要使用repartition

    spark.default.parallelism设置的并行度只能对没有Spark SQL(DataFrame)的阶段有用,对Spark SQL的并行度是无法设置的,该并行度是通过hdfs文件所在的block块决定的。

    可以通过repartition调整之后的并行度

  4.2 如何使用 

sqlContext.sql("select * from user_visit_action where date >= '" + startDate + "' and date <= '" + endDate + "'").javaRDD()    //使用repartition调整并行度    .repartition(100)

 5、使用reduceByKey进行本地聚合

  5.1 reduceByKey有哪些优点

    reduceByKey相对于普通的shuffle操作(如groupByKey)的一个最大的优点,会进行map端的本地聚合,从而减少文件的输出,减少磁盘IO,网络传输,内存占比以及reduce端的聚合操作数据。

  5.2 使用场景

    只有是针对每个不同的key进行相应的操作都可以使用reduceByKey进行处理

转载于:https://www.cnblogs.com/lifeone/p/6439130.html

你可能感兴趣的文章
应“云”而生--云时代的运维新理念
查看>>
RFID能否让实体零售业度过“寒冬”?
查看>>
Swagger2接口注释参数使用数组
查看>>
“IP的力量”专题论坛成功举办,聚焦行业共话IP与VR AR技术新融合
查看>>
Ubuntu下使用UFW配置防火墙(简化iptables的操作)
查看>>
OpenStack快速入门-queens版本
查看>>
大数据驱动智能制造 物联网引爆工业革命商机
查看>>
一个比较完善的购物车类
查看>>
「镁客·请讲」Visense Studio冯樑杰:游戏基因的VR视频,最好的表现是真人实拍交互...
查看>>
让人欲罢不能的量子学
查看>>
美团在Redis上踩过的一些坑-2.bgrewriteaof问题
查看>>
C# StreamReader.ReadLine统计行数的问题
查看>>
异常测试实践与梳理
查看>>
多者异也
查看>>
tf:'hello tensorflow'
查看>>
RedisConf2018记录--Day 1 sessions
查看>>
CentOS的el5, el6, el7代表什么
查看>>
柏林纪行(中):Node.js Collaboration Summit
查看>>
IT网络通信大变革时代来临 2016中国极客大奖为您找到风向标
查看>>
如何下载WDK
查看>>