浅谈SparkSQL中 Broadcast Hash Join (BHJ) 的选择
发布网友
发布时间:2024-10-04 22:21
我来回答
共1个回答
热心网友
时间:2024-11-29 05:59
Broadcast Hash Join(BHJ)是SparkSQL用于分布式join操作的核心方法之一。在SQL中添加hint可指定使用BHJ实现join操作,但更多情况下,SparkSQL框架会自动选择是否采用BHJ。在Spark 3.0引入AQE特性后,BHJ的选择过程分为正常模式和AQE模式两个部分。
在正常模式下,SQL解析过程涉及将优化后的逻辑计划转化为物理计划。此过程中的策略(Strategy)负责将逻辑算子转化为对应的物理算子,并最终变成RDD的具体操作。在转换过程中,需要基于成本模型来选择最合适的算子实现join操作。在SparkStrategies类的apply方法中,选择BHJ需满足条件:如果一侧表足够小可以广播,且支持等值join,或者两侧表大小均小,则选择较小的表进行广播。判断表大小的逻辑通过获取估计统计值实现,使用org.apache.spark.sql.execution.SparkStrategies.JoinSelection类中的方法。若estimated statistics估计偏小,BHJ存在OOM风险;若估计偏大,则可能在表大小小于阈值时仍不采用BHJ。
当表一侧大小小于给定阈值(由spark.sql.autoBroadcastJoinThreshold参数设定)时,判定为足够小,选择采用BHJ。具体实现逻辑在JoinSelection类的createJoinWithoutHint方法中,通过canBroadcast方法检查。
AQE开启后,选择BHJ的过程更为动态。在Spark 3.0版本的AQE中,根据当前stage的执行情况进行优化,执行过程中的再优化通过调用reOptimize方法实现。在Spark 3.2版本中,针对AQE模式下的BHJ选择进行了微调。PR [SPARK-35264][SQL]在内容和代码层面进行了修改,优化了在AQE模式下的BHJ选择逻辑。
Statistics在何时计算、何时使用是另一个重要话题。大致结论是,Statistics的计算与使用在SQL解析过程中进行,帮助评估表大小并决定是否采用BHJ。不同情况下(正常模式还是AQE模式、是否开启cbo)Statistics的计算逻辑存在差异,AQE模式使用最新的Statistics,这一逻辑实现于databricks官网的Adaptive Query Execution部分。这部分的具体细节作为后续文章的主题,以深入探讨Statistics在不同场景下的作用和实现。