问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

Spark的join什么情况下可以避免shuffle?

发布网友 发布时间:2022-11-21 08:09

我来回答

1个回答

热心网友 时间:2023-10-15 07:25

Spark的join操作可能触发shuffle操作。shuffle操作要经过磁盘IO,网络传输,对性能影响比较大。本文聊一聊Spark的join在哪些情况下可以避免shuffle过程。

针对Spark DataFrame/DataSet的join,可以通过broadcast join和bucket join来避免shuffle操作。

Broadcast join很好理解,小表被分发到所有executors,所以不需要做shuffle就可以完成join. Spark SQL控制自动broadcast join的参数是:spark.sql.autoBroadcastJoinThreshold , 默认为10MB. 就是说当join中的一张表的size小于10MB时,spark会自动将其封装为broadcast发送到所有结点,然后进行broadcast join. 当然也可以手动将join中的某张表转化成broadcast : 

                 sparkSession.sparkContext.broadcast(df)

Bucket join其实就是将要join的两张表按照join columns(或join columns的子集)根据相同的partitioner预先做好分区,并将这些分区信息存储到catalog中(比如HiveExternalCatalog);然后在读取这两张表并做join时,spark根据bucket信息将两张表的相同partition进行join即可,从而避免了shuffle的过程。注意,这里是避免了shuffle过程,并没有完全避免网络传输,由于两张表的相同partition不一定在同一台机器上,所以这里仍需要对其中一张表的partition进行网络传输。关于spark bucketing的原理和使用细节可以参见这个 视频 。

笔者这里想讨论的是PairRDDFunctions类的join方法。在RDD对象中有一个隐式转换可以将rdd转换成PairRDDFunctions对象,这样就可以直接在rdd对象上调用join方法:

先来看看PairRDDFunctions的join方法:

PairRDDFunctions有多个重载的join方法,上面这个只带一个RDD对象的参数,我们接着看它调用的另一个重载的join方法:

可以看到,RDD的join实现是由cogroup方法完成的,cogroup完后得到的是类型为RDD[(K, (Iterable[V], Iterable[W]))]的rdd对象,其中K为key的类型,V为第一张join表的value类型,W为第二张join表的value类型;然后,调用flatMapValues将其转换成RDD[(K, V, W)]的rdd对象。

下面来看看PairRDDFunctions.cogroup方法的实现:

cogroup中生成了CoGroupedRDD对象,所以关键是这个RDD的getDependencies方法返回的dependencies中是否存在shuffle dependency.

看看这个RDD的getDependencies方法:

其中的rdds就是进行cogroup的rdd序列,也就是PairRDDFunctions.cogroup方法中传入的 Seq(self, other)  .

重点来了,对于所有参与cogroup的rdd,如果它的partitioner和结果CoGroupedRDD的partitioner相同,则该rdd会成为CoGroupedRDD的一个oneToOne窄依赖,否则就是一个shuffle依赖,即宽依赖。

我们知道,只有宽依赖才会触发shuffle,所以RDD的join可以避免shuffle的条件是: 参与join的所有rdd的partitioner都和结果rdd的partitioner相同。

那么,结果rdd的partitioner是怎么确定的呢?上文讲到PairRDDFunctions.join方法有多个重载,其中就有可以指定partitioner的重载,如果没有指定,则使用默认的partitioner,看看默认的partitioner是怎么确定的:

简单地说就是:

1. 如果父rdds中有可用的合格的partitioner,则直接使用其中分区数最大的那个partitioner;

2. 如果没有,则根据默认分区数生成HashPartitioner.

至于怎样的partitioner是合格的,请读者阅读上面的Partitioner.defaultPartitioner方法和Partitioner.isEligiblePartitioner方法。

RDD的compute方法是真正计算得到数据的方法,我们来看看CoGroupedRDD的compute方法是怎么实现的:

可以看到,CoGroupedRDD的数据是根据不同的依赖从父rdd中获取的:

1. 对于窄依赖,直接调用父rdd的iterator方法获取对应partition的数据

2. 对于宽依赖,从shuffleManager获取shuffleReader对象进行读取,这里就是shuffle read了

还有一个重点是读取多个父rdds的数据后,怎么将这些数据根据key进行cogroup?

这里用到了ExternalAppendOnlyMap来构建key和grouped values的映射。先来看看createExternalMap的实现:

相关类型定义如下:

可以看到,ExternalAppendOnlyMap的构造函数的参数是是三个方法参数:

1. createCombiner : 对每个key创建用于合并values的combiner数据结构,在这里就是一个CoGroup的数据,数组大小就是dependencies的数量

2. mergeValue : 将每个value合并到对应key的combiner数据结构中,在这里就是将一个CoGroupValue对象添加到其所在rdd对应的CoGroup中

3. mergeCombiners : 合并相同key的两个combiner数据结构,在这里就是合并两个CoGroupCombiner对象

CoGroupedRDD.compute会调用ExternalAppendOnlyMap.insertAll方法将从父rdds得到的数据一个一个地插入到ExternalAppendOnlyMap对象中进行合并。

最后,以这个ExternalAppendOnlyMap对象作为参数构造InterruptibleIterator,这个iterator会被调用者用于访问CoGroupedRDD的单个partition的所有数据。

本文简单地介绍了DataFrame/DataSet如何避免join中的shuffle过程,并根据源码详述了RDD的join操作的具体实现细节,分析了RDD的join在什么情况下可以避免shuffle过程。

1. 源码版本:2.4.0

2. 水平有限,如有错误,望读者指出
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
如何查被录取到的专业 怎样查被录取的专业 录取后怎样查询录取的专业 录取专业怎么查 已被录取怎么查专业 ghs网络语什么意思_ghs网络语意思出处含义介绍 纸箱企业管理软件 ghs什么意思网络(ghs什么意思网络用语) 《喜羊羊与灰太狼》大结局 0与任何数相加都得原数吗? pandas数据分析-常用命令 Spark DataSet常用action,及操作汇总 永清环保业绩发布?永清环保股价低?永清环保股 大跌? 永清环保股票今日公告?永清环保还可以买吗?300187永清环保分红? 签了保密协议可以随意辞职吗 20200618-day4-what's that old proverb? 那句老话怎么说来着? 东风风神a30机油传感器怎样更换? 压力变送器、压力传感器的温漂是多少? 抖音商品橱窗怎么开通的? 井水抽到喷泉池后加了些漂白粉,然后就出现红褐色的沉淀,加了明矾没什... 你好EVR-AL00怎么数据连接不上 10斤鲜辣椒能磨出多少辣椒粉呢 阿里巴巴如何进入互联网金融 大学生接触互联网金融应该注意什么 互联网金融的七种模式 蒸煮桶机械版的好还是电脑版的好 大枪传说勋章礼盒换什么? 尿酸正常标准是多少? DNF:21日活动看不明白?跳过过程直接看结果了解下 柳桃花对空气有害吗 excelvba遍历文件夹里的所有表格添加同一页 美式橄榄球的全队灵魂传球手为什么叫四分卫(Quarterback)? 光大信托理财师待遇 杭州高级理财师工资收入多少 科目二考试有时间限定吗? 成都大华 鼻综合做的好不好啊 费用大概多少钱呢? 不是自己的实名会怎么样? 为什么我的登上去不是我原来的号? 为什么我的登上去不是我原来的号? 登上不是自己怎么办? 荣耀畅玩30的u盘储存在哪里 新号码注册微信显示不是我的继续注册后一样吗? 手机卡是本人,不是本人怎么解决? 微信不小心点了,不是我的登录,怎么找回原来的? 大家都喜欢智能珠宝有什么功能 求助:电热水器应该买多大的合适 三口人 TCL D65A620U,康佳A65U,长虹65U3C,创维65M6E,乐视x65,哪个更好_百度知 ... 初中英语名词分类? 王者荣耀荆轲不同时期出装 成都生育险报销需要什么资料