目录

Spark专题-第二部分Spark-SQL-入门8-算子介绍-sort

Spark专题-第二部分:Spark SQL 入门(8)-算子介绍-sort

[https://csdnimg.cn/release/blogv2/dist/pc/img/activeVector.png VibeCoding·九月创作之星挑战赛 10w+人浏览 2.2k人参与

https://csdnimg.cn/release/blogv2/dist/pc/img/arrowright-line-White.png]( )

Spark专题-第二部分:Spark SQL 入门(8)-算子介绍-sort

要不是在工作中发现了性能极差的cluster by写法,真把sort算子给忘了,赶紧补上

Sort算子概述

在Spark SQL中,排序操作通过SortExec物理算子实现。不同的SQL排序语法会产生不同的执行计划,涉及不同的数据分布和排序策略。

排序相关SQL语法及对应算子

1. ORDER BY

  • 描述: 全局排序,对整个数据集进行排序

  • 对应算子: SortExec + Exchange(单分区)

  • 执行特点: 需要将所有数据收集到单个Executor进行排序

  • SQL写法:

    SELECT * FROM sales ORDER BY sale_amount DESC;

2. SORT BY

  • 描述: 在每个分区内排序,不保证全局有序

  • 对应算子: SortExec(无Exchange)

  • 执行特点: 分区内排序,性能较好但结果不是全局有序

  • SQL写法:

    SELECT * FROM sales SORT BY sale_amount DESC;

3. DISTRIBUTE BY

  • 描述: 按指定列重新分布数据,但不排序

  • 对应算子: Exchange(哈希分区)

  • 执行特点: 仅重新分区,不进行排序

  • SQL写法:

    SELECT * FROM sales DISTRIBUTE BY department;

4. CLUSTER BY

  • 描述: DISTRIBUTE BY和SORT BY的组合,按相同列分区和排序

  • 对应算子: Exchange + SortExec

  • 执行特点: 先按列分区,然后在每个分区内按同一列排序

  • SQL写法:

    SELECT * FROM sales CLUSTER BY department;

物理执行计划对比

ORDER BY执行流程

数据流示例

分区1: Bob-1500, Alice-1000

分区2: Charlie-1200, Alice-800

Exchange收集所有数据到单个分区

单分区: Charlie-1200, Bob-1500, Alice-1000, Alice-800

SortExec按sale_amount DESC排序

结果: Bob-1500, Charlie-1200, Alice-1000, Alice-800

TableScan sales

Exchange: 单分区全局收集

SortExec: 全局排序

Output

SORT BY执行流程

数据流示例

分区1原始: Bob-1500, Alice-1000

分区1排序后: Bob-1500, Alice-1000

分区2原始: Charlie-1200, Alice-800

分区2排序后: Charlie-1200, Alice-800

输出不是全局有序

TableScan sales

SortExec: 分区内排序

Output

DISTRIBUTE BY执行流程

数据分布示例

原始分区

按department重新分区

Tech分区: Alice-1000, Bob-1500, Alice-800, Bob-2000

HR分区: Charlie-1200

分区内无序

TableScan sales

Exchange: 按department哈希分区

Output

CLUSTER BY执行流程

数据流示例

原始数据分布在多个分区

按department哈希分区

Tech分区: Alice-1000, Bob-1500, Alice-800, Bob-2000

Tech分区排序后: Alice-1000, Alice-800, Bob-1500, Bob-2000

HR分区: Charlie-1200

TableScan sales

Exchange: 按department哈希分区

SortExec: 分区内按department排序

Output

实际案例与执行计划分析

案例: 销售数据分析

-- 创建示例表
CREATE TABLE sales (
    sale_id INT,
    salesperson STRING,
    department STRING,
    sale_amount DOUBLE,
    sale_date DATE
);

-- 插入数据
INSERT INTO sales VALUES
(1, 'Alice', 'Tech', 1000.0, '2023-01-15'),
(2, 'Bob', 'Tech', 1500.0, '2023-01-16'),
(3, 'Alice', 'Tech', 800.0, '2023-02-10'),
(4, 'Charlie', 'HR', 1200.0, '2023-01-20'),
(5, 'Bob', 'Tech', 2000.0, '2023-02-05');

ORDER BY执行计划

EXPLAIN EXTENDED
SELECT * FROM sales ORDER BY sale_amount DESC;

物理执行计划:

== Physical Plan ==
*(2) Sort [sale_amount#10 DESC NULLS LAST], true, 0
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#15]
+- *(1) Scan ExistingRDD[sale_id#8,salesperson#9,sale_amount#10,department#11,sale_date#12]

SORT BY执行计划

EXPLAIN EXTENDED
SELECT * FROM sales SORT BY sale_amount DESC;

物理执行计划:
== Physical Plan ==
*(1) Sort [sale_amount#10 DESC NULLS LAST], false, 0
± *(1) Scan ExistingRDD[sale_id#8,salesperson#9,sale_amount#10,department#11,sale_date#12]

DISTRIBUTE BY + SORT BY执行计划

EXPLAIN EXTENDED
SELECT * FROM sales 
DISTRIBUTE BY department 
SORT BY sale_amount DESC;

物理执行计划:

== Physical Plan ==
*(2) Sort [sale_amount#10 DESC NULLS LAST], false, 0
+- Exchange hashpartitioning(department#11, 200), ENSURE_REQUIREMENTS, [id=#15]
+- *(1) Scan ExistingRDD[sale_id#8,salesperson#9,sale_amount#10,department#11,sale_date#12]

性能优化策略

1. 避免全局排序(ORDER BY)

小数据

大数据

需要全局排序?

数据量大小

使用ORDER BY

考虑替代方案

使用SORT BY + 后续处理

使用CLUSTER BY

使用SORT BY

2. 分区策略优化

-- 调整分区数避免数据倾斜
SET spark.sql.shuffle.partitions=100;

-- 对于已知数据分布,使用范围分区
SET spark.sql.adaptive.enabled=true;
SET spark.sql.adaptive.coalescePartitions.enabled=true;

3. 内存管理优化

-- 调整排序内存
SET spark.sql.sort.spill.numElementsForceSpillThreshold=1000000;
SET spark.sql.execution.sort.spill.initialMemoryThreshold=100000;

-- 启用外部排序
SET spark.sql.sort.enableRadixSort=true;
SET spark.sql.useExternalSort=true;

不同场景下的选择策略

场景1: 生成排序报告(需要全局有序)

-- 小数据量:使用ORDER BY
SELECT * FROM daily_sales ORDER BY sale_amount DESC;

-- 大数据量:分阶段处理
CREATE TABLE temp_sorted AS
SELECT * FROM big_sales SORT BY sale_amount DESC;

-- 然后对较小结果集进行全局排序(如果需要)
SELECT * FROM temp_sorted ORDER BY sale_amount DESC;

场景2: 为后续聚合操作准备数据

-- 使用CLUSTER BY优化后续的窗口函数
SELECT salesperson,
       SUM(sale_amount) OVER (PARTITION BY department ORDER BY sale_date) 
FROM sales 
CLUSTER BY department, sale_date;

场景3: 数据重新分布

-- 使用DISTRIBUTE BY优化数据分布
INSERT OVERWRITE TABLE sales_by_dept
SELECT * FROM sales DISTRIBUTE BY department;

执行计划解析技巧

识别排序类型

在物理执行计划中关注:

  • 全局排序: Sort [column ASC/DESC], true, 0 + Exchange SinglePartition
  • 分区内排序: Sort [column ASC/DESC], false, 0
  • 分区排序: Sort [column ASC/DESC], false, 0 + Exchange hashpartitioning

性能瓶颈诊断

-- 查看排序操作的详细统计
EXPLAIN COST
SELECT * FROM sales ORDER BY sale_amount DESC;

-- 监控排序过程中的数据倾斜
SET spark.sql.adaptive.skew.enabled=true;
SET spark.sql.adaptive.logLevel=DEBUG;

最佳实践总结

  1. 小数据全局排序: 使用ORDER BY,但注意内存限制
  2. 大数据局部排序: 使用SORT BY或CLUSTER BY
  3. 优化数据分布: 使用DISTRIBUTE BY为后续操作准备数据
  4. 监控内存使用: 排序操作容易导致内存溢出,需要合理配置
  5. 利用自适应查询: 启用Spark的自适应查询执行优化排序过程

这下应该没有遗漏的重要算子了