Spark专题-第二部分Spark-SQL-入门8-算子介绍-sort
Spark专题-第二部分:Spark SQL 入门(8)-算子介绍-sort
[
VibeCoding·九月创作之星挑战赛
10w+人浏览
2.2k人参与
](
)
Spark专题-第二部分:Spark SQL 入门(8)-算子介绍-sort
要不是在工作中发现了性能极差的cluster by写法,真把sort算子给忘了,赶紧补上
Sort算子概述
在Spark SQL中,排序操作通过SortExec
物理算子实现。不同的SQL排序语法会产生不同的执行计划,涉及不同的数据分布和排序策略。
排序相关SQL语法及对应算子
1. ORDER BY
描述: 全局排序,对整个数据集进行排序
对应算子:
SortExec
+Exchange
(单分区)执行特点: 需要将所有数据收集到单个Executor进行排序
SQL写法:
SELECT * FROM sales ORDER BY sale_amount DESC;
2. SORT BY
描述: 在每个分区内排序,不保证全局有序
对应算子:
SortExec
(无Exchange)执行特点: 分区内排序,性能较好但结果不是全局有序
SQL写法:
SELECT * FROM sales SORT BY sale_amount DESC;
3. DISTRIBUTE BY
描述: 按指定列重新分布数据,但不排序
对应算子:
Exchange
(哈希分区)执行特点: 仅重新分区,不进行排序
SQL写法:
SELECT * FROM sales DISTRIBUTE BY department;
4. CLUSTER BY
描述: DISTRIBUTE BY和SORT BY的组合,按相同列分区和排序
对应算子:
Exchange
+SortExec
执行特点: 先按列分区,然后在每个分区内按同一列排序
SQL写法:
SELECT * FROM sales CLUSTER BY department;
物理执行计划对比
ORDER BY执行流程
数据流示例
分区1: Bob-1500, Alice-1000
分区2: Charlie-1200, Alice-800
Exchange收集所有数据到单个分区
单分区: Charlie-1200, Bob-1500, Alice-1000, Alice-800
SortExec按sale_amount DESC排序
结果: Bob-1500, Charlie-1200, Alice-1000, Alice-800
TableScan sales
Exchange: 单分区全局收集
SortExec: 全局排序
Output
SORT BY执行流程
数据流示例
分区1原始: Bob-1500, Alice-1000
分区1排序后: Bob-1500, Alice-1000
分区2原始: Charlie-1200, Alice-800
分区2排序后: Charlie-1200, Alice-800
输出不是全局有序
TableScan sales
SortExec: 分区内排序
Output
DISTRIBUTE BY执行流程
数据分布示例
原始分区
按department重新分区
Tech分区: Alice-1000, Bob-1500, Alice-800, Bob-2000
HR分区: Charlie-1200
分区内无序
TableScan sales
Exchange: 按department哈希分区
Output
CLUSTER BY执行流程
数据流示例
原始数据分布在多个分区
按department哈希分区
Tech分区: Alice-1000, Bob-1500, Alice-800, Bob-2000
Tech分区排序后: Alice-1000, Alice-800, Bob-1500, Bob-2000
HR分区: Charlie-1200
TableScan sales
Exchange: 按department哈希分区
SortExec: 分区内按department排序
Output
实际案例与执行计划分析
案例: 销售数据分析
-- 创建示例表
CREATE TABLE sales (
sale_id INT,
salesperson STRING,
department STRING,
sale_amount DOUBLE,
sale_date DATE
);
-- 插入数据
INSERT INTO sales VALUES
(1, 'Alice', 'Tech', 1000.0, '2023-01-15'),
(2, 'Bob', 'Tech', 1500.0, '2023-01-16'),
(3, 'Alice', 'Tech', 800.0, '2023-02-10'),
(4, 'Charlie', 'HR', 1200.0, '2023-01-20'),
(5, 'Bob', 'Tech', 2000.0, '2023-02-05');
ORDER BY执行计划
EXPLAIN EXTENDED
SELECT * FROM sales ORDER BY sale_amount DESC;
物理执行计划:
== Physical Plan ==
*(2) Sort [sale_amount#10 DESC NULLS LAST], true, 0
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#15]
+- *(1) Scan ExistingRDD[sale_id#8,salesperson#9,sale_amount#10,department#11,sale_date#12]
SORT BY执行计划
EXPLAIN EXTENDED
SELECT * FROM sales SORT BY sale_amount DESC;
物理执行计划:
== Physical Plan ==
*(1) Sort [sale_amount#10 DESC NULLS LAST], false, 0
± *(1) Scan ExistingRDD[sale_id#8,salesperson#9,sale_amount#10,department#11,sale_date#12]
DISTRIBUTE BY + SORT BY执行计划
EXPLAIN EXTENDED
SELECT * FROM sales
DISTRIBUTE BY department
SORT BY sale_amount DESC;
物理执行计划:
== Physical Plan ==
*(2) Sort [sale_amount#10 DESC NULLS LAST], false, 0
+- Exchange hashpartitioning(department#11, 200), ENSURE_REQUIREMENTS, [id=#15]
+- *(1) Scan ExistingRDD[sale_id#8,salesperson#9,sale_amount#10,department#11,sale_date#12]
性能优化策略
1. 避免全局排序(ORDER BY)
是
小数据
大数据
否
需要全局排序?
数据量大小
使用ORDER BY
考虑替代方案
使用SORT BY + 后续处理
使用CLUSTER BY
使用SORT BY
2. 分区策略优化
-- 调整分区数避免数据倾斜
SET spark.sql.shuffle.partitions=100;
-- 对于已知数据分布,使用范围分区
SET spark.sql.adaptive.enabled=true;
SET spark.sql.adaptive.coalescePartitions.enabled=true;
3. 内存管理优化
-- 调整排序内存
SET spark.sql.sort.spill.numElementsForceSpillThreshold=1000000;
SET spark.sql.execution.sort.spill.initialMemoryThreshold=100000;
-- 启用外部排序
SET spark.sql.sort.enableRadixSort=true;
SET spark.sql.useExternalSort=true;
不同场景下的选择策略
场景1: 生成排序报告(需要全局有序)
-- 小数据量:使用ORDER BY
SELECT * FROM daily_sales ORDER BY sale_amount DESC;
-- 大数据量:分阶段处理
CREATE TABLE temp_sorted AS
SELECT * FROM big_sales SORT BY sale_amount DESC;
-- 然后对较小结果集进行全局排序(如果需要)
SELECT * FROM temp_sorted ORDER BY sale_amount DESC;
场景2: 为后续聚合操作准备数据
-- 使用CLUSTER BY优化后续的窗口函数
SELECT salesperson,
SUM(sale_amount) OVER (PARTITION BY department ORDER BY sale_date)
FROM sales
CLUSTER BY department, sale_date;
场景3: 数据重新分布
-- 使用DISTRIBUTE BY优化数据分布
INSERT OVERWRITE TABLE sales_by_dept
SELECT * FROM sales DISTRIBUTE BY department;
执行计划解析技巧
识别排序类型
在物理执行计划中关注:
- 全局排序:
Sort [column ASC/DESC], true, 0
+Exchange SinglePartition
- 分区内排序:
Sort [column ASC/DESC], false, 0
- 分区排序:
Sort [column ASC/DESC], false, 0
+Exchange hashpartitioning
性能瓶颈诊断
-- 查看排序操作的详细统计
EXPLAIN COST
SELECT * FROM sales ORDER BY sale_amount DESC;
-- 监控排序过程中的数据倾斜
SET spark.sql.adaptive.skew.enabled=true;
SET spark.sql.adaptive.logLevel=DEBUG;
最佳实践总结
- 小数据全局排序: 使用ORDER BY,但注意内存限制
- 大数据局部排序: 使用SORT BY或CLUSTER BY
- 优化数据分布: 使用DISTRIBUTE BY为后续操作准备数据
- 监控内存使用: 排序操作容易导致内存溢出,需要合理配置
- 利用自适应查询: 启用Spark的自适应查询执行优化排序过程
这下应该没有遗漏的重要算子了