1.Blaze:SparkSQL Native 算子优化在快手的算k算设计与实践
2.Sparkä¸cacheåpersistçåºå«
3.SPARK-38864 - Spark支持unpivot源码分析
4.Spark repartitionåcoalesceçåºå«
5.Spark原理详解
Blaze:SparkSQL Native 算子优化在快手的设计与实践
Blaze:SparkSQL Native 算子优化在快手的设计与实践
在当前时代,Spark因其相比Hive的源码强大性能,已成为众多公司主要的详解执行引擎。随着业务的算k算不断扩展和数据规模的提升,对Spark性能提升的源码追求从未停止。Spark性能提升主要集中在两个方向:执行计划优化和运行时执行效率优化。详解视频源码自动采集
分享将围绕Blaze项目展开,算k算探索其在快手的源码设计与实践。Blaze是详解一个基于Apache DataFusion项目封装的向量化执行引擎中间件,旨在充分利用Spark分布式计算框架的算k算优势,同时发挥DataFusion Native向量化算子执行的源码性能优势。
首先,详解回顾Spark的算k算发展历程,从1.0版本的源码解释执行模型到2.0版本的多算子编译,再到3.0版本引入的详解Adaptive Query Execution(AQE)自适应执行引擎,直至未来的优化方向——向量化执行。
接着,探讨向量化执行的进展,包括Velox、花卷云软件源码Gluten、Photon和Native Codegen等公司的探索与实践,以及它们如何结合Spark生态进行性能提升。
Blaze项目在快手的探索始于两年前,经过持续迭代,目前已具备上线使用的能力。 Blaze基于Apache DataFusion项目封装,通过扩展组件将Spark生成的物理执行计划转换为对应的Native执行计划,进而传递给底层的DataFusion执行引擎。
整体架构包括Spark on Blaze架构的流向,展示如何通过Blaze Extension组件将Spark执行流程与DataFusion Native执行引擎连接起来,以及执行计划转换、Native计划生成与提交的过程。核心组件包括Blaze Session Extension、Plan SerDe、JNI Gateways和Native Operators。
执行过程分为物理执行计划的转换、Native计划生成与提交,资料付费下载源码以及Native执行三个阶段。物理执行计划转换部分涉及算子检查、翻译与优化策略,以确保转换对性能的影响降到最低。Native计划生成与提交涉及序列化与JNI传递,而Native执行则通过DataFusion框架异步执行。
Blaze项目在实际应用中实现了对TPC-DS数据集的高效处理,支持大量业务UDF调用,并提供了内存管理、UDF兼容等优化。此外,项目还实现了对特定算子的深度定制优化,如排序、聚合等。
当前进展包括算子覆盖度的提升、基准测试结果、线上收益及未来工作规划。算子覆盖度基本按照线上使用频率推进,公式源码已更改支持常见的算子,对某些算子进行优化和改进。性能测试显示在特定场景下能够实现高达倍的性能提升。线上应用已实现部分生产ETL任务的性能提升,单任务性能提升最多可达四倍以上。
未来工作计划包括持续优化算子支持、大规模推广、接口抽象以支持更多引擎以及回馈开源社区。项目已开放初步可用版本,欢迎更多开发者参与共建。
本文总结了Blaze项目在快手的设计与实践,展示了其在Spark性能提升方面的探索与应用,为业界提供了宝贵的参考与借鉴。
Sparkä¸cacheåpersistçåºå«
cache
ããé»è®¤æ¯å°æ°æ®åæ¾å°å åä¸ï¼ææ§è¡
ããdef cache(): this.type = persist()
ããpersist
ããå¯ä»¥æå®æä¹ åç级å«ã
ããæ常ç¨çæ¯MEMORY_ONLYåMEMORY_AND_DISKã
ããâ_2â表示æå¯æ¬æ°ãå°½éé¿å 使ç¨_2åDISK_ONLY级å«
ããcacheåpersistç注æç¹
ãã1.é½æ¯ææ§è¡(æçå«å»¶è¿æ§è¡)ï¼éè¦action触åæ§è¡ï¼æå°åä½æ¯partition
ãã2.对ä¸ä¸ªRDDè¿è¡cacheæè persistä¹åï¼ä¸æ¬¡ç´æ¥ä½¿ç¨è¿ä¸ªåéï¼å°±æ¯ä½¿ç¨æä¹ åçæ°æ®
ãã3.å¦æ使ç¨ç¬¬äºç§æ¹å¼ï¼ä¸è½ç´§è·actionç®å
SPARK- - Spark支持unpivot源码分析
unpivot是数据库系统中用于列转行的内置函数,如SQL SERVER, Oracle等。以数据集tb1为例,每个数字代表某个人在某个学科的成绩。若要将此表扩展为三元组,武陵神装源码可使用union实现。但随列数增加,SQL语句变长。许多SQL引擎提供内置函数unpivot简化此过程。unpivot使用时需指定保留列、进行转行的列、新列名及值列名。
SPARK从SPARK-版本开始支持DataSet的unpivot函数,逐步扩展至pyspark与SQL。在Dataset API中,ids为要保留的Column数组,Column类提供了从String构造Column的隐式转换,方便使用。利用此API,可通过unpivot函数将数据集转换为所需的三元组。values表示转行列,variableColumnName为新列名,valueColumnName为值列名。
Analyser阶段解析unpivot算子,将逻辑执行计划转化为物理执行计划。当用户开启hive catalog,SPARK SQL根据表名和metastore URL查找表元数据,转化为Hive相关逻辑执行计划。物理执行计划如BroadcastHashJoinExec,表示具体的执行策略。规则ResolveUnpivot将包含unpivot的算子转换为Expand算子,在物理执行计划阶段执行。此转换由开发者自定义规则完成,通过遍历逻辑执行计划树,根据节点类型及状态进行不同处理。
unpivot函数实现过程中,首先将原始数据集投影为包含ids、variableColumnName、valueColumnName的列,实现语义转换。随后,通过map函数处理values列,构建新的行数据,最终返回Expand算子。在物理执行计划阶段,Expand算子将数据转换为所需形式,实现unpivot功能。
综上所述,SPARK内置函数unpivot的实现通过解析列参数,组装Expand算子完成,为用户提供简便的列转行功能。通过理解此过程,可深入掌握SPARK SQL的开发原理与内在机制。
Spark repartitionåcoalesceçåºå«
æäºæ¶åï¼å¨å¾å¤partitionçæ¶åï¼æ们æ³åå°ç¹partitionçæ°éï¼ä¸ç¶åå°HDFSä¸çæ件æ°éä¹ä¼å¾å¤å¾å¤ã
æ们使ç¨reparationå¢ï¼è¿æ¯coalesceãæ以æ们å¾äºè§£è¿ä¸¤ä¸ªç®åçå å¨åºå«ã
è¦ç¥éï¼repartitionæ¯ä¸ä¸ªæ¶èæ¯è¾æè´µçæä½ç®åï¼Sparkåºäºä¸ä¸ªä¼åççrepartitionå«åcoalesceï¼å®å¯ä»¥å°½éé¿å æ°æ®è¿ç§»ï¼
ä½æ¯ä½ åªè½åå°RDDçpartition.
举个ä¾åï¼æå¦ä¸æ°æ®èç¹åå¸ï¼
ç¨coalesceï¼å°partitionåå°å°2个ï¼
注æï¼Node1 å Node3 ä¸éè¦ç§»å¨åå§çæ°æ®
The repartition algorithm does a full shuffle and creates new partitions with data thatâs distributed evenly.
Letâs create a DataFrame with the numbers from 1 to .
repartition ç®æ³ä¼åä¸ä¸ªfull shuffleç¶ååååå¸å°å建æ°çpartitionãæ们å建ä¸ä¸ª1-æ°åçDataFrameæµè¯ä¸ä¸ã
åå¼å§æ°æ®æ¯è¿æ ·åå¸çï¼
æ们åä¸ä¸ªfull shuffleï¼å°å ¶repartition为2个ã
è¿æ¯å¨ææºå¨ä¸æ°æ®åå¸çæ åµï¼
Partition A: 1, 3, 4, 6, 7, 9, ,
Partition B: 2, 5, 8,
The repartition method makes new partitions and evenly distributes the data in the new partitions (the data distribution is more even for larger data sets).
repartitionæ¹æ³è®©æ°çpartitionååå°åå¸äºæ°æ®ï¼æ°æ®é大çæ åµä¸å ¶å®ä¼æ´ååï¼
coalesceç¨å·²æçpartitionå»å°½éåå°æ°æ®shuffleã
repartitionå建æ°çpartition并ä¸ä½¿ç¨ full shuffleã
coalesceä¼ä½¿å¾æ¯ä¸ªpartitionä¸åæ°éçæ°æ®åå¸ï¼æäºæ¶åå个partitionä¼æä¸åçsizeï¼
ç¶èï¼repartition使å¾æ¯ä¸ªpartitionçæ°æ®å¤§å°é½ç²ç¥å°ç¸çã
coalesce ä¸ repartitionçåºå«ï¼æ们ä¸é¢è¯´çcoalesceé½é»è®¤shuffleåæ°ä¸ºfalseçæ åµï¼
repartition(numPartitions:Int):RDD[T]åcoalesce(numPartitions:Intï¼shuffle:Boolean=false):RDD[T] repartitionåªæ¯coalesceæ¥å£ä¸shuffle为trueçå®ç°
æ1wçå°æ件ï¼èµæºä¹ä¸º--executor-memory 2g --executor-cores 2 --num-executors 5ã
repartition(4)ï¼äº§çshuffleãè¿æ¶ä¼å¯å¨5个executoråä¹åä»ç»çé£æ ·ä¾æ¬¡è¯»å1w个ååºçæ件ï¼ç¶åæç §æ个è§å%4,åå°4个æ件ä¸ï¼è¿æ ·ååºç4个æ件åºæ¬æ¯«æ è§å¾ï¼æ¯è¾ååã
coalesce(4):è¿ä¸ªcoalesceä¸ä¼äº§çshuffleãé£å¯å¨5个executorå¨ä¸åçshuffleçæ¶åæ¯å¦ä½çæ4个æ件å¢ï¼å ¶å®ä¼æ1个æ2个æ3个çè³æ´å¤çexecutorå¨ç©ºè·ï¼å ·ä½å 个executor空è·ä¸sparkè°åº¦æå ³ï¼ä¸æ°æ®æ¬å°æ§æå ³ï¼ä¸sparké群è´è½½æå ³ï¼ï¼ä»å¹¶æ²¡æ读åä»»ä½æ°æ®ï¼
1.å¦æç»æ产ççæ件æ°è¦æ¯æºRDD partitionå°ï¼ç¨coalesceæ¯å®ç°ä¸äºçï¼ä¾å¦æ4个å°æ件ï¼4个partitionï¼ï¼ä½ è¦çæ5个æ件ç¨coalesceå®ç°ä¸äºï¼ä¹å°±æ¯è¯´ä¸äº§çshuffleï¼æ æ³å®ç°æ件æ°åå¤ã
2.å¦æä½ åªæ1个executorï¼1个coreï¼ï¼æºRDD partitionæ5个ï¼ä½ è¦ç¨coalesce产ç2个æ件ãé£ä¹ä»æ¯é¢åpartitionå°executorä¸çï¼ä¾å¦0-2å·ååºå¨å executorä¸æ§è¡å®æ¯ï¼3-4å·ååºå次å¨åä¸ä¸ªexecutoræ§è¡ãå ¶å®é½æ¯åä¸ä¸ªexecutorä½æ¯ååè¦ä¸²è¡è¯»ä¸åæ°æ®ãä¸ç¨repartition(2)å¨è¯»partitionä¸æè¾å¤§ä¸åï¼ä¸²è¡ä¾æ¬¡è¯»0-4å·partition å%2å¤çï¼ã
T表æGæ°æ® æ个partition èµæºä¹ä¸º--executor-memory 2g --executor-cores 2 --num-executors 5ãæ们æ³è¦ç»ææ件åªæä¸ä¸ª
Spark原理详解
Spark原理详解: Spark是一个专为大规模数据处理设计的内存计算框架,其高效得益于其核心组件——弹性数据分布集RDD。RDD是Spark的数据结构,它将数据存储在分布式内存中,通过逻辑上的集中管理和物理上的分布式存储,提供了高效并行计算的能力。 RDD的五个关键特性如下:每个RDD由多个partition组成,用户可以指定分区数量,默认为CPU核心数。每个partition独立处理,便于并行计算。
Spark的计算基于partition,算子作用于partition上,无需保存中间结果,提高效率。
RDD之间有依赖性,数据丢失时仅重新计算丢失分区,避免全量重算。
对于key-value格式的RDD,有Partitioner决定分片和数据分布,优化数据处理的本地化。
Spark根据数据位置调度任务,实现“移动计算”而非数据。
Spark区分窄依赖(一对一)和宽依赖(一对多),前者不涉及shuffle,后者则会根据key进行数据切分。 Spark的执行流程包括用户提交任务、生成DAG、划分stage和task、在worker节点执行计算等步骤。创建RDD的方式多样,包括程序中的集合、本地文件、HDFS、数据库、NoSQL和数据流等。 技术栈方面,Spark与HDFS、YARN、MR、Hive等紧密集成,提供SparkCore、SparkSQL、SparkStreaming等扩展功能。 在编写Spark代码时,首先创建SparkConf和SparkContext,然后操作RDD进行转换和应用Action,最后关闭SparkContext。理解底层机制有助于优化资源使用,如HDFS文件的split与partition关系。 搭建Spark集群涉及上传、配置worker和master信息,以及启动和访问。内存管理则需注意Executor的off-heap和heap,以及Spark内存的分配和使用。