目录

Paimon官网阅读Spark-引擎

Paimon——官网阅读:Spark 引擎

快速入门

准备工作

Paimon目前支持Spark 3.5、3.4、3.3、3.2和3.1 。为获得更好的体验,我们推荐使用最新的Spark版本。

下载对应版本的JAR文件。

版本JAR文件
Spark 3.5
Spark 3.4
Spark 3.3
Spark 3.2
Spark 3.1

你也可以从源代码手动构建捆绑JAR文件。

要从源代码构建, 。

使用以下命令构建捆绑JAR文件。

mvn clean install -DskipTests

对于Spark 3.3,你可以在./paimon - spark/paimon - spark - 3.3/target/paimon - spark - 3.3 - 0.9.0.jar找到捆绑JAR文件。

设置

如果你正在使用HDFS,请确保设置了环境变量HADOOP_HOMEHADOOP_CONF_DIR

步骤1:指定Paimon JAR文件

在启动spark - sql时,将Paimon JAR文件的路径追加到--jars参数中。

spark - sql … –jars /path/to/paimon - spark - 3.3 - 0.9.0.jar

或者使用--packages选项。

spark - sql … –packages org.apache.paimon:paimon - spark - 3.3:0.9.0

另外,你可以将paimon - spark - 3.3 - 0.9.0.jar复制到Spark安装目录下的spark/jars中。

步骤2:指定Paimon目录

目录:在启动spark - sql时,使用以下命令以名称paimon注册Paimon的Spark目录。仓库的表文件存储在/tmp/paimon下。

spark - sql ... \
    --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon.warehouse=file:/tmp/paimon \
    --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions

目录是通过spark.sql.catalog.(catalog_name)下的属性进行配置的。在上述示例中,paimon是目录名称,你可以将其更改为自己喜欢的目录名称。

spark - sql命令行启动后,运行以下SQL创建并切换到default数据库。

USE paimon;
USE default;

切换到该目录(USE paimon)后,Spark现有的表将无法直接访问,你可以使用spark_catalog.${database_name}.${table_name}来访问Spark表。

Create Table

Catalog

create table my_table (
    k int,
    v string
) tblproperties (
    'primary-key' = 'k'
);

Generic Catalog

create table my_table (
    k int,
    v string
) USING paimon
tblproperties (
    'primary-key' = 'k'
) ;

Insert Table

Paimon目前支持Spark 3.2及以上版本进行SQL写入。

INSERT INTO my_table VALUES (1, ‘Hi’), (2, ‘Hello’);

Query Table

SQL

SELECT * FROM my_table;
/*
1   Hi
2   Hello
*/

DataFrame

val dataset = spark.read.format("paimon").load("file:/tmp/paimon/default.db/my_table")dataset.show()

/*
+---+------+
| k |     v|
+---+------+
|  1|    Hi|
|  2| Hello|
+---+------+
*/

Spark类型转换

本节列出了Spark和Paimon之间所有支持的类型转换。所有Spark的数据类型都在org.apache.spark.sql.types包中。

Spark数据类型Paimon数据类型原子类型
StructTypeRowTypefalse
MapTypeMapTypefalse
ArrayTypeArrayTypefalse
BooleanTypeBooleanTypetrue
ByteTypeTinyIntTypetrue
ShortTypeSmallIntTypetrue
IntegerTypeIntTypetrue
LongTypeBigIntTypetrue
FloatTypeFloatTypetrue
DoubleTypeDoubleTypetrue
StringTypeVarCharType(Integer.MAX_VALUE)true
VarCharType(length)VarCharType(length)true
CharType(length)CharType(length)true
DateTypeDateTypetrue
TimestampTypeLocalZonedTimestamptrue
TimestampNTZType(Spark3.4 +)TimestampTypetrue
DecimalType(precision, scale)DecimalType(precision, scale)true
BinaryTypeVarBinaryType, BinaryTypetrue

由于之前的设计,在Spark3.3及以下版本中,Paimon会将Paimon的TimestampTypeLocalZonedTimestamp都映射到Spark的TimestampType,并且仅能正确处理TimestampType

因此,在使用Spark3.3及以下版本读取由其他引擎(如Flink)写入的具有LocalZonedTimestamp类型的Paimon表时,LocalZonedTimestamp类型的查询结果将带有时区偏移,需要手动调整。

当使用Spark3.4及以上版本时,所有时间戳类型都可以被正确解析。

拓展:

  • Paimon与Spark集成:Paimon作为一种数据存储和管理解决方案,与Spark的集成拓宽了其应用场景。通过支持多种Spark版本,用户可以根据自身的技术栈和需求灵活选择合适的Spark版本来与Paimon协同工作。从准备工作到具体的设置步骤,详细的指引确保了用户能够顺利搭建起两者协同的环境。
  • 类型转换细节:Spark和Paimon之间的类型转换是确保数据正确处理和存储的关键。不同版本的Spark在处理Paimon特定时间戳类型时存在差异,这反映了不同版本在设计和实现上的特点。例如,Spark 3.3及以下版本对LocalZonedTimestamp类型处理的局限性,强调了版本选择对数据处理准确性的影响。在实际应用中,开发者需要根据具体情况,如数据来源(是否由其他引擎写入带有特定时间戳类型的数据)和Spark版本,来合理处理数据类型,以避免因类型转换问题导致的数据不一致或错误。
  • 构建与配置:从源代码构建Paimon的Spark JAR文件,让有定制化需求的用户能够根据自身需要进行构建。而设置过程中对JAR文件路径的指定和目录的配置,涉及到Spark的命令行参数和配置属性的使用,这对于理解Spark的运行机制以及如何与外部存储系统集成至关重要。例如,--jars--packages选项的不同用法为用户提供了灵活的添加外部依赖的方式,而spark.sql.catalog相关配置则决定了Paimon在Spark中的目录结构和存储位置等关键信息。

SQL DDL

Create Catalog

Paimon目录目前支持三种类型的元存储:

  • 文件系统元存储(默认),它在文件系统中同时存储元数据和表文件。

  • Hive元存储,它额外将元数据存储在Hive元存储中。用户可以直接从Hive访问这些表。

  • JDBC元存储,它额外将元数据存储在关系数据库中,如MySQL、Postgres等。

    创建目录时的详细选项请参阅CatalogOptions。

Create Filesystem Catalog

以下Spark SQL注册并使用一个名为my_catalog的Paimon目录。元数据和表文件存储在hdfs:///path/to/warehouse下。

以下Shell命令注册一个名为paimon的Paimon目录。元数据和表文件存储在hdfs:///path/to/warehouse下。

spark - sql... \
    --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon.warehouse=hdfs:///path/to/warehouse

对于在此目录中创建的表,你可以使用前缀spark.sql.catalog.paimon.table - default. 定义任何默认表选项。

在启动spark - sql后,你可以使用以下SQL切换到paimon目录的默认数据库。

USE paimon.default;

Creating Hive Catalog

通过使用Paimon Hive目录,对该目录的更改将直接影响相应的Hive元存储。在此类目录中创建的表也可以直接从Hive访问。

要使用Hive目录,数据库名、表名和字段名应为小写。

你的Spark安装应该能够检测到Hive依赖项,或者已经包含这些依赖项。更多信息请参阅 。

以下Shell命令注册一个名为paimon的Paimon Hive目录。元数据和表文件存储在hdfs:///path/to/warehouse下。此外,元数据也存储在Hive元存储中。

spark - sql... \
    --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon.warehouse=hdfs:///path/to/warehouse \
    --conf spark.sql.catalog.paimon.metastore=hive \
    --conf spark.sql.catalog.paimon.uri=thrift://:

对于在此目录中创建的表,你可以使用前缀spark.sql.catalog.paimon.table-default. 定义任何默认表选项。

在启动spark - sql后,你可以使用以下SQL切换到paimon目录的默认数据库。

USE paimon.default;

此外,你可以创建 。

Synchronizing Partitions into Hive Metastore

默认情况下,Paimon不会将新创建的分区同步到Hive元存储。用户在Hive中看到的将是一个未分区的表。分区下推将通过过滤下推来实现。

如果你想在Hive中看到一个分区表,并且也将新创建的分区同步到Hive元存储,请将表属性metastore.partitioned-table设置为true。另请参阅 。

Creating JDBC Catalog

通过使用Paimon JDBC目录,对该目录的更改将直接存储在关系数据库中,如SQLite、MySQL、Postgres等。

目前,锁配置仅支持MySQL和SQLite。如果你使用不同类型的数据库存储目录,请不要配置lock.enabled。

Spark中的Paimon JDBC目录需要正确添加用于连接数据库的相应JAR包。你应该首先下载JDBC连接器捆绑JAR并将其添加到类路径中,例如MySQL、Postgres。

数据库类型捆绑包名称SQL客户端JAR
mysqlmysql-connector-java
postgrespostgresql
spark - sql... \
    --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon.warehouse=hdfs:///path/to/warehouse \
    --conf spark.sql.catalog.paimon.metastore=jdbc \
    --conf spark.sql.catalog.paimon.uri=jdbc:mysql://<host>:<port>/<databaseName> \
    --conf spark.sql.catalog.paimon.jdbc.user=... \
    --conf spark.sql.catalog.paimon.jdbc.password=...

USE paimon.default;

Create Table

在使用Paimon目录后,你可以创建和删除表。在Paimon目录中创建的表由该目录管理。当从目录中删除表时,其表文件也将被删除。

以下SQL假设你已经注册并正在使用一个Paimon目录。它在目录的默认数据库中创建一个名为my_table的托管表,该表有五列,其中dt、hh和user_id是主键。

CREATE TABLE my_table (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) TBLPROPERTIES (
    'primary-key' = 'dt,hh,user_id'
);

你可以创建分区表:

CREATE TABLE my_table (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES (
    'primary-key' = 'dt,hh,user_id'
);

Create Table As Select(CTAS)

表可以通过查询结果来创建并填充数据。例如,我们有这样一条SQL:CREATE TABLE table_b AS SELECT id, name FROM table_a,结果表table_b等同于使用以下语句创建表并插入数据:CREATE TABLE table_b (id INT, name STRING); INSERT INTO table_b SELECT id, name FROM table_a;

当使用CREATE TABLE AS SELECT时,我们可以指定主键或分区,语法请参考以下SQL。

CREATE TABLE my_table (
     user_id BIGINT,
     item_id BIGINT
);
CREATE TABLE my_table_as AS SELECT * FROM my_table;

/* partitioned table*/
CREATE TABLE my_table_partition (
      user_id BIGINT,
      item_id BIGINT,
      behavior STRING,
      dt STRING,
      hh STRING
) PARTITIONED BY (dt, hh);
CREATE TABLE my_table_partition_as PARTITIONED BY (dt) AS SELECT * FROM my_table_partition;

/* change TBLPROPERTIES */
CREATE TABLE my_table_options (
       user_id BIGINT,
       item_id BIGINT
) TBLPROPERTIES ('file.format' = 'orc');
CREATE TABLE my_table_options_as TBLPROPERTIES ('file.format' = 'parquet') AS SELECT * FROM my_table_options;


/* primary key */
CREATE TABLE my_table_pk (
     user_id BIGINT,
     item_id BIGINT,
     behavior STRING,
     dt STRING,
     hh STRING
) TBLPROPERTIES (
    'primary-key' = 'dt,hh,user_id'
);
CREATE TABLE my_table_pk_as TBLPROPERTIES ('primary-key' = 'dt') AS SELECT * FROM my_table_pk;

/* primary key + partition */
CREATE TABLE my_table_all (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES (
    'primary-key' = 'dt,hh,user_id'
);
CREATE TABLE my_table_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM my_table_all;

拓展:

  • Paimon目录类型:Paimon支持的三种元存储类型为用户提供了不同的数据管理和访问方式。文件系统元存储简单直接,适用于对Hive或关系数据库依赖度较低的场景;Hive元存储则便于与现有的Hive生态集成,实现数据在Hive和Paimon间的无缝交互;JDBC元存储使得元数据管理与常见的关系数据库相结合,利用关系数据库的特性(如事务支持、数据持久化等)来管理Paimon的元数据。
  • 分区同步与表创建:将分区同步到Hive元存储的设置涉及到不同数据存储和查询引擎间的兼容性与数据一致性。通过设置特定表属性来控制分区同步,这在多引擎协同工作的数据处理流程中非常重要。而通过CREATE TABLE AS SELECT语句创建表,不仅简化了表创建和数据插入的流程,还提供了灵活的方式来定义表结构、分区和主键等属性,在数据处理和ETL(Extract,Transform,Load)过程中能够快速根据已有数据构建新的数据集。
  • JDBC目录的注意事项:在使用Paimon JDBC目录时,针对不同数据库类型的配置差异以及锁配置的支持情况是需要重点关注的。例如,仅MySQL和SQLite支持锁配置,这对于需要保证数据一致性和并发访问控制的场景具有重要影响。同时,正确添加相应JDBC连接器JAR包到类路径是确保与数据库成功连接的关键步骤,反映了在跨系统集成时依赖管理的重要性。

SQL Write

Syntax

INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] { value_expr | query };

更多信息,请查看语法文档:Spark INSERT语句

INSERT INTO

使用INSERT INTO将记录和更改应用到表中。

INSERT INTO my_table SELECT...

Overwriting the Whole Table

使用INSERT OVERWRITE覆盖整个未分区表。

INSERT OVERWRITE my_table SELECT...

Overwriting a Partition

使用INSERT OVERWRITE覆盖一个分区。

INSERT OVERWRITE my_table PARTITION (key1 = value1, key2 = value2,...) SELECT...

Dynamic Overwrite

Spark的默认覆盖模式是静态分区覆盖。要启用动态覆盖,你需要将Spark会话配置spark.sql.sources.partitionOverwriteMode设置为dynamic

例如:

CREATE TABLE my_table (id INT, pt STRING) PARTITIONED BY (pt);
INSERT INTO my_table VALUES (1, 'p1'), (2, 'p2');

-- 静态覆盖(覆盖整张表)
INSERT OVERWRITE my_table VALUES (3, 'p1');

SELECT * FROM my_table;
/*
+---+---+
| id| pt|
+---+---+
|  3| p1|
+---+---+
*/

-- 动态覆盖(仅覆盖pt='p1')
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE my_table VALUES (3, 'p1');

SELECT * FROM my_table;
/*
+---+---+
| id| pt|
+---+---+
|  2| p2|
|  3| p1|
+---+---+
*/

Truncate tables

TRUNCATE TABLE my_table;

Updating tables

Spark支持更新基本类型(PrimitiveType)和结构体类型(StructType),例如:

-- 语法
UPDATE table_identifier SET column1 = value1, column2 = value2,... WHERE condition;

CREATE TABLE t (
  id INT, 
  s STRUCT, 
  name STRING)
TBLPROPERTIES (
  'primary-key' = 'id', 
 'merge-engine' = 'deduplicate'
);

-- 你可以使用
UPDATE t SET name = 'a_new' WHERE id = 1;
UPDATE t SET s.c2 = 'a_new' WHERE s.c1 = 1;

Deleting from table

DELETE FROM my_table WHERE currency = 'UNKNOWN';

Merging into table

Paimon目前在Spark 3+中支持Merge Into语法,它允许在单个提交中基于源表进行一组更新、插入和删除操作。

这仅适用于有主键的表。 在更新子句中,不支持更新主键列。 WHEN NOT MATCHED BY SOURCE语法不被支持。

示例一:这是一个简单的演示,如果目标表中存在某一行则更新它,否则插入它。

-- 这里源表和目标表具有相同的模式:(a INT, b INT, c STRING),并且a是主键。

MERGE INTO target
USING source
ON target.a = source.a
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *

示例二:这是一个带有多个条件子句的演示。

-- 这里源表和目标表具有相同的模式:(a INT, b INT, c STRING),并且a是主键。

MERGE INTO target
USING source
ON target.a = source.a
WHEN MATCHED AND target.a = 5 THEN
   UPDATE SET b = source.b + target.b      -- 当匹配且满足条件1时,更新b;
WHEN MATCHED AND source.c > 'c2' THEN
   UPDATE SET *    -- 当匹配且满足条件2时,更新所有列;
WHEN MATCHED THEN
   DELETE      -- 当匹配时,删除目标表中的这一行;
WHEN NOT MATCHED AND c > 'c9' THEN
   INSERT (a, b, c) VALUES (a, b * 1.1, c)      -- 当不匹配但满足条件3时,转换并插入这一行;
WHEN NOT MATCHED THEN
INSERT *      -- 当不匹配时,不进行任何转换插入这一行;

Streaming Write

Paimon目前支持在Spark 3+中进行流式写入。

Paimon结构化流仅支持追加(append)和完整(complete)两种模式。

// 如果不存在则创建一个Paimon表。
spark.sql(s"""
           |CREATE TABLE T (k INT, v STRING)
           |TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
           |""".stripMargin)

// 这里我们使用MemoryStream来模拟一个流数据源。
val inputData = MemoryStream[(Int, String)]
val df = inputData.toDS().toDF("k", "v")

// 流式写入Paimon表。
val stream = df
  .writeStream
  .outputMode("append")
  .option("checkpointLocation", "/path/to/checkpoint")
  .format("paimon")
  .start("/path/to/paimon/sink/table")

Schema Evolution

模式演变是一项功能,它允许用户轻松修改表的当前模式,以适应现有数据或随时间变化的新数据,同时保持数据的完整性和一致性。

Paimon支持在写入数据时自动合并源数据和当前表数据的模式,并将合并后的模式用作表的最新模式,并且只需要配置write.merge-schema

data.write
  .format("paimon")
  .mode("append")
  .option("write.merge-schema", "true")
  .save(location)

当启用write.merge-schema时,Paimon默认允许用户对表模式执行以下操作:

  • 添加列
  • 提升列的类型(例如Int -> Long)

Paimon还支持某些类型之间的显式类型转换(例如String -> Date,Long -> Int),这需要显式配置write.merge-schema.explicit-cast

模式演变可以同时在流式模式下使用。

val inputData = MemoryStream[(Int, String)]
inputData
  .toDS()
  .toDF("col1", "col2")
  .writeStream
  .format("paimon")
  .option("checkpointLocation", "/path/to/checkpoint")
  .option("write.merge-schema", "true")
  .option("write.merge-schema.explicit-cast", "true")
  .start(location)

以下列出相关配置:

Scan Mode描述
write.merge-schema如果为true,在写入数据前自动合并数据模式和表模式。
write.merge-schema.explicit-cast如果为true,当两种类型满足显式转换规则时,允许合并数据类型。

拓展

  • 数据操作语法:这部分内容详细介绍了在Paimon与Spark集成环境下丰富的数据操作语法。INSERT相关语法涵盖了常规插入、覆盖表及分区等不同场景,并且介绍了静态和动态分区覆盖模式的区别与使用方法,这对于数据的增量更新和全量替换等操作提供了灵活手段。UPDATEDELETEMERGE INTO语法则进一步丰富了数据修改的功能,MERGE INTO语法在处理复杂的数据同步和更新逻辑时尤为强大,能够在一次操作中完成多种类型的数据变更。
  • 流式写入与模式演变:流式写入支持为实时数据处理提供了能力,Paimon结合Spark的结构化流功能,限定在追加和完整两种模式下,满足不同的实时数据写入需求。模式演变功能则是Paimon应对数据模式变化的重要特性,通过简单配置即可实现模式自动合并以及特定类型的显式转换,这在数据不断变化的大数据场景中,大大提高了数据处理的灵活性和适应性,减少了因数据模式变更带来的复杂处理流程。这些功能的结合,使得Paimon在数据处理和管理方面形成了一个较为完整的体系,适用于多种复杂的数据处理场景。

SQL Query

与所有其他表一样,Paimon表可以使用SELECT语句进行查询。

Batch Query

Paimon的批量读取会返回表某个快照中的所有数据。默认情况下,批量读取返回最新的快照。

Batch Time Travel

Paimon的批量时间回溯读取可以指定一个快照或标签,并读取相应的数据。

要求Spark 3.3+ 。

你可以在查询中使用VERSION AS OFTIMESTAMP AS OF进行时间回溯:

-- 读取ID为1L的快照(使用快照ID作为版本)
SELECT * FROM t VERSION AS OF 1;

-- 从指定时间戳读取快照
SELECT * FROM t TIMESTAMP AS OF '2023-06-01 00:00:00.123';

-- 从以 Unix 秒为单位的指定时间戳读取快照
SELECT * FROM t TIMESTAMP AS OF 1678883047;

-- 读取标签'my-tag'
SELECT * FROM t VERSION AS OF'my-tag';

-- 从指定的水位线读取快照。将匹配水位线之后的第一个快照
SELECT * FROM t VERSION AS OF 'watermark-1678883047356';

如果标签的名称是一个数字且等于某个快照ID,VERSION AS OF语法将优先考虑标签。例如,如果你有一个基于快照2命名为“1”的标签,SELECT * FROM t VERSION AS OF '1'实际上查询的是快照2而不是快照1。

Batch Incremental

读取起始快照(不包含)和结束快照之间的增量更改。

例如:

  • “5,10” 表示快照5和快照10之间的更改。
  • “TAG1,TAG3” 表示TAG1和TAG3之间的更改。

默认情况下,对于生成变更日志文件的表,将扫描变更日志文件。否则,扫描新更改的文件。你也可以强制指定’incremental - between - scan - mode’。

要求Spark 3.2+ 。

Paimon支持使用Spark SQL通过Spark表值函数实现增量查询。

你可以在查询中使用paimon_incremental_query来提取增量数据:

-- 读取快照ID 12和快照ID 20之间的增量数据。
SELECT * FROM paimon_incremental_query('tableName', 12, 20);

在批量SQL中,不允许返回DELETE记录,因此带有 -D的记录将被丢弃。如果你想查看DELETE记录,可以查询audit_log表。

Streaming Query

Paimon目前支持在Spark 3.3+ 上进行流式读取。

Paimon为流式读取提供了丰富的扫描模式。如下所示:

扫描模式描述
latest对于流数据源,持续读取最新更改,开始时不生成快照。
latest-full对于流数据源,首次启动时生成表的最新快照,并继续读取最新更改。
from-timestamp对于流数据源,从“scan.timestamp - millis”指定的时间戳开始持续读取更改,开始时不生成快照。
from-snapshot对于流数据源,从“scan.snapshot - id”指定的快照开始持续读取更改,开始时不生成快照。
from-snapshot-full对于流数据源,首次启动时从“scan.snapshot - id”指定的快照生成表,并持续读取更改。
default如果指定了“scan.snapshot - id”,则等同于from - snapshot。如果指定了“timestamp - millis”,则等同于from - timestamp。否则,等同于latest - full。

一个使用默认扫描模式的简单示例:

// 未提供任何与扫描相关的配置,将使用latest - full扫描模式。
val query = spark.readStream
  .format("paimon")
  .load("/path/to/paimon/source/table")
  .writeStream
  .format("console")
  .start()

Paimon结构化流还支持多种流式读取模式,它可以支持多种触发器和多种读取限制。

支持的读取限制如下:

默认值类型描述
read.stream.maxFilesPerTrigger(无)整数单个批次中返回的最大文件数。
read.stream.maxBytesPerTrigger(无)长整型单个批次中返回的最大字节数。
read.stream.maxRowsPerTrigger(无)长整型单个批次中返回的最大行数。
read.stream.minRowsPerTrigger(无)长整型单个批次中返回的最小行数,与read.stream.maxTriggerDelayMs一起用于创建MinRowsReadLimit。
read.stream.maxTriggerDelayMs(无)长整型相邻两个批次之间的最大延迟,与read.stream.minRowsPerTrigger一起用于创建MinRowsReadLimit。

示例一:使用org.apache.spark.sql.streaming.Trigger.AvailableNow()和Paimon定义的maxBytesPerTrigger

// Trigger.AvailableNow()) 在查询开始时以一个或多个批次处理所有可用数据,然后终止查询。
// 将read.stream.maxBytesPerTrigger设置为128M意味着每个批次最多处理128 MB的数据。
val query = spark.readStream
  .format("paimon")
  .option("read.stream.maxBytesPerTrigger", "134217728")
  .load("/path/to/paimon/source/table")
  .writeStream
  .format("console")
  .trigger(Trigger.AvailableNow())
  .start()

示例二:使用org.apache.spark.sql.connector.read.streaming.ReadMinRows

// 除非两批次之间的间隔超过300秒,否则直到有超过5000条数据时才会触发一个批次。
val query = spark.readStream
  .format("paimon")
  .option("read.stream.minRowsPerTrigger", "5000")
  .option("read.stream.maxTriggerDelayMs", "300000")
  .load("/path/to/paimon/source/table")
  .writeStream
  .format("console")
  .start()

Paimon结构化流支持以变更日志的形式读取行(在行中添加rowkind列以表示其更改类型),有两种方式:

  • 直接对流读取系统的audit_log
  • read.changelog设置为true(默认值为false),然后对流读取表位置

示例:

// 方式一
val query1 = spark.readStream
  .format("paimon")
  .table("`table_name$audit_log`")
  .writeStream
  .format("console")
  .start()

// 方式二
val query2 = spark.readStream
  .format("paimon")
  .option("read.changelog", "true")
  .load("/path/to/paimon/source/table")
  .writeStream
  .format("console")
  .start()

/*
+I   1  Hi
+I   2  Hello
*/

Query Optimization

强烈建议在查询时指定分区和主键过滤器,这将加速查询的数据跳过。

能够加速数据跳过的过滤函数有:

  • =
  • <
  • <=
  • =

  • IN (…)
  • LIKE ‘abc%’
  • IS NULL

Paimon会按主键对数据进行排序,这加速了点查询和范围查询。当使用复合主键时,查询过滤器最好形成主键的 ,以获得良好的加速效果。

假设一个表有以下定义:

CREATE TABLE orders (
    catalog_id BIGINT,
    order_id BIGINT,
    .....,
) TBLPROPERTIES (
    'primary-key' = 'catalog_id,order_id'
);

通过为主键的最左前缀指定范围过滤器,查询将获得良好的加速效果。

SELECT * FROM orders WHERE catalog_id=1025;

SELECT * FROM orders WHERE catalog_id=1025 AND order_id=29495;

SELECT * FROM orders
  WHERE catalog_id=1025
  AND order_id>2035 AND order_id<6000;

然而,以下过滤器不能很好地加速查询。

SELECT * FROM orders WHERE order_id=29495;

SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495;

拓展:

  • Paimon查询功能:Paimon在查询方面提供了丰富且强大的功能。批量查询涵盖了获取最新快照数据、时间回溯以及增量查询等特性。时间回溯功能允许用户获取历史某个时间点或基于标签的表数据状态,对于数据版本管理和历史数据分析非常有用;增量查询则能高效地获取两个快照间的变化数据,在数据同步、差异分析等场景中具有重要意义。
  • 流式查询特性:流式查询不仅提供了多种扫描模式以满足不同实时数据读取需求,还支持各种读取限制和触发器,这为实时数据处理的资源控制和调度提供了灵活手段。以变更日志形式读取行数据则进一步丰富了对流式数据变化的追踪能力,有助于构建复杂的实时数据处理逻辑。
  • 查询优化要点:查询优化强调了通过合理使用分区和主键过滤器,利用Paimon按主键排序数据的特点来加速查询。这对于提升大数据量下的查询性能至关重要,开发人员在编写查询语句时,应充分考虑表结构和主键定义,以优化查询效率,减少数据扫描范围,从而提升整个数据处理系统的性能。 这些功能的结合,使得Paimon在数据查询处理方面能够适应多样化的业务场景和性能需求。

SQL Alter

Changing/Adding Table Properties

以下SQL将write-buffer-size表属性设置为256 MB。

ALTER TABLE my_table SET TBLPROPERTIES (
    'write-buffer-size' = '256 MB'
);

Removing Table Properties

以下SQL删除write-buffer-size表属性。

ALTER TABLE my_table UNSET TBLPROPERTIES ('write-buffer-size');

Changing/Adding Table Comment

以下SQL将表my_table的注释修改为“table comment”。

ALTER TABLE my_table SET TBLPROPERTIES (
    'comment' = 'table comment'
    );

Removing Table Comment

以下SQL删除表注释。

ALTER TABLE my_table UNSET TBLPROPERTIES ('comment');

Rename Table Name

以下SQL将表名重命名为新名称。

最简单的调用SQL语句为:

ALTER TABLE my_table RENAME TO my_table_new;

请注意:我们可以通过以下方式在Spark中重命名Paimon表:

ALTER TABLE [catalog.[database.]]test1 RENAME to [database.]test2;

但是我们不能在重命名后的表前加上目录名,如果写成这样的SQL会抛出错误:

ALTER TABLE catalog.database.test1 RENAME to catalog.database.test2;

如果使用对象存储,如S3或OSS,请谨慎使用此语法,因为对象存储的重命名操作不是原子性的,失败时可能只会移动部分文件。

Adding New Columns

以下SQL向表my_table添加两列c1c2

ALTER TABLE my_table ADD COLUMNS (
    c1 INT,
    c2 STRING
);

Renaming Column Name

以下SQL将表my_table中的列c0重命名为c1

ALTER TABLE my_table RENAME COLUMN c0 TO c1;

Dropping Columns

以下SQL从表my_table中删除两列c1c2

ALTER TABLE my_table DROP COLUMNS (c1, c2);

Dropping Partitions

以下SQL删除Paimon表的分区。对于Spark SQL,需要指定所有分区列。

ALTER TABLE my_table DROP PARTITION (`id` = 1, `name` = 'paimon');

Changing Column Comment

以下SQL将列buy_count的注释修改为“buy count”。

ALTER TABLE my_table ALTER COLUMN buy_count COMMENT 'buy count';

Adding Column Position

ALTER TABLE my_table ADD COLUMN c INT FIRST;
ALTER TABLE my_table ADD COLUMN c INT AFTER b;

Changing Column Position

ALTER TABLE my_table ALTER COLUMN col_a FIRST;
ALTER TABLE my_table ALTER COLUMN col_a AFTER col_b;

Changing Column Type

ALTER TABLE my_table ALTER COLUMN col_a TYPE DOUBLE;

拓展

  • 表结构与属性操作:这些操作丰富了对Paimon表的管理手段。从表属性的增删改,到表注释、表名的处理,再到列的各种操作(添加、重命名、删除、修改注释、调整位置和类型)以及分区的删除,涵盖了在数据处理过程中可能需要对表结构进行的各种常见变更。例如,修改表属性可以调整数据写入时的参数,像write - buffer - size属性可能影响写入性能;重命名表名、列名等操作有助于更好地组织和标识数据。
  • 注意事项:在重命名表时,特别是涉及对象存储的情况,由于对象存储重命名的非原子性,可能导致部分文件移动失败,这强调了在实际应用中操作的谨慎性。在进行表结构变更时,开发人员需要充分考虑这些操作对现有数据处理流程和数据一致性的影响。例如,修改列类型时需要确保数据的兼容性,避免数据丢失或错误。这些操作的合理运用可以使数据存储结构更好地适应业务需求的变化,提升数据管理的灵活性和高效性。

Auxiliary Statements

Set / Reset

SET 命令用于设置属性、返回现有属性的值,或者返回所有带有值和含义的SQL配置(SQLConf)属性。RESET 命令将通过 SET 命令设置的特定于当前会话的运行时配置重置为其默认值。要专门设置Paimon配置,需要添加 spark.paimon. 前缀。

-- 设置Spark配置
SET spark.sql.sources.partitionOverwriteMode=dynamic;

-- 设置Paimon配置
SET spark.paimon.file.block-size=512M;

-- 重置配置
RESET spark.paimon.file.block-size;

Describe table

DESCRIBE TABLE 语句返回表的基本元数据信息。元数据信息包括列名、列类型和列注释。

-- 描述表
DESCRIBE TABLE my_table;

-- 描述表及附加元数据
DESCRIBE TABLE EXTENDED my_table;

Show create table

SHOW CREATE TABLE 返回用于创建给定表的 CREATE TABLE 语句。

SHOW CREATE TABLE my_table;

Show columns

返回表中的列列表。如果表不存在,则会抛出异常。

SHOW COLUMNS FROM my_table;

Show partitions

SHOW PARTITIONS 语句用于列出表的分区。可以指定一个可选的分区规范,以返回与所提供分区规范匹配的分区。

-- 列出my_table的所有分区
SHOW PARTITIONS my_table;

-- 列出my_table中与所提供分区规范匹配的分区
SHOW PARTITIONS my_table PARTITION (dt=20230817);

Analyze table

ANALYZE TABLE 语句收集关于表的统计信息,查询优化器将使用这些统计信息来找到更好的查询执行计划。Paimon支持通过分析收集表级统计信息和列统计信息。

-- 收集表级统计信息
ANALYZE TABLE my_table COMPUTE STATISTICS;

-- 收集表级统计信息以及col1的列统计信息
ANALYZE TABLE my_table COMPUTE STATISTICS FOR COLUMNS col1;

-- 收集表级统计信息以及所有列的列统计信息
ANALYZE TABLE my_table COMPUTE STATISTICS FOR ALL COLUMNS;

拓展:

  • 辅助语句功能:这些辅助语句在数据库操作中扮演着至关重要的角色。SETRESET 语句为用户提供了灵活配置运行时参数的能力,无论是通用的Spark配置还是特定的Paimon配置,都可以根据不同的业务需求进行动态调整。例如,通过设置 spark.sql.sources.partitionOverwriteModedynamic,可以改变数据写入分区时的覆盖模式,从而影响数据更新的策略。
  • 元数据与统计信息操作DESCRIBE TABLESHOW CREATE TABLESHOW COLUMNSSHOW PARTITIONS 等语句帮助用户快速获取表的元数据信息,这对于理解表结构、分区情况以及创建表的语句非常有用,在进行数据查询、维护和迁移等操作时,这些信息是必不可少的基础。而 ANALYZE TABLE 语句则专注于收集统计信息,查询优化器利用这些统计信息能够更智能地生成高效的查询执行计划,从而提升查询性能,特别是在处理大规模数据时,合适的统计信息可以显著减少查询的执行时间和资源消耗。这些辅助语句共同构成了一个完整的数据库辅助操作体系,帮助用户更好地管理和操作Paimon表。

Procedures

本节介绍所有可用的与Paimon相关的Spark存储过程。

存储过程名称说明示例
compact用于压缩文件。参数: * table:目标表标识符,不能为空。 * partitions:分区过滤器,“,” 表示 “AND”,“;” 表示 “OR”。如果要压缩date=01且day=01的一个分区,需写为’date=01,day=01’。留空表示所有分区(不能与“where”一起使用)。 * where:分区谓词,留空表示所有分区(不能与“partitions”一起使用)。 * order_strategy:‘order’ 或 ‘zorder’ 或 ‘hilbert’ 或 ’none’,留空表示 ’none’。 * order_columns:需要排序的列,如果’order_strategy’为 ’none’,则留空。 * partition_idle_time:用于对在 ‘partition_idle_time’ 内未接收任何新数据的分区进行完全压缩,并且仅压缩这些分区。此参数不能与有序压缩一起使用。 SET spark.sql.shuffle.partitions=10; – 设置压缩并行度SET spark.sql.shuffle.partitions=10; –set the compact parallelism CALL sys.compact(table => ‘T’, partitions => ‘p=0;p=1’, order_strategy => ‘zorder’, order_by => ‘a,b’) CALL sys.compact(table => ‘T’, where => ‘p>0 and p<3’, order_strategy => ‘zorder’, order_by => ‘a,b’) CALL sys.compact(table => ‘T’, partition_idle_time => ’60s’)
expire_snapshots用于过期快照。参数: table:目标表标识符,不能为空。 retain_max:要保留的已完成快照的最大数量。 retain_min:要保留的已完成快照的最小数量。 older_than:在此时间戳之前的快照将被删除。 max_deletes:一次可以删除的最大快照数量。CALL sys.expire_snapshots(table => ‘default.T’, retain_max => 10)
expire_partitions用于过期分区。参数: table:目标表标识符,不能为空。 expiration_time:分区的过期时间间隔。如果分区的存在时间超过此值,该分区将过期。分区时间从分区值中提取。 timestamp_formatter:用于从字符串格式化时间戳的格式化器。 timestamp_pattern:用于从分区获取时间戳的模式。 expire_strategy:指定分区过期的过期策略,可能值为:‘values-time’ 或 ‘update-time’,默认值为 ‘values-time’。CALL sys.expire_partitions(table => ‘default.T’, expiration_time => ‘1 d’, timestamp_formatter => ‘yyyy-MM-dd’, timestamp_pattern => ‘$dt’, expire_strategy => ‘values-time’)
create_tag基于给定的快照创建一个标签。参数: table:目标表标识符,不能为空。 tag:新标签的名称,不能为空。 snapshot(Long):新标签所基于的快照的ID。 time_retained:新创建标签的最长保留时间。– 基于快照10,保留1天 CALL sys.create_tag(table => ‘default.T’, tag =>‘my_tag’, snapshot => 10, time_retained => ‘1 d’) – 基于最新快照 CALL sys.create_tag(table => ‘default.T’, tag =>‘my_tag’)
create_tag_from_timestamp基于给定的时间戳创建一个标签。参数: identifier:目标表标识符,不能为空。 tag:新标签的名称。 timestamp (Long):查找提交时间大于此时间戳的第一个快照。 time_retained :新创建标签的最长保留时间。CALL sys.create_tag_from_timestamp(table => ‘default.T’, tag => ‘my_tag’, timestamp => 1724404318750, time_retained => ‘1 d’)
delete_tag删除一个标签。参数: table:目标表标识符,不能为空。 tag:要删除的标签的名称。如果指定多个标签,分隔符为 ‘,’。CALL sys.delete_tag(table => ‘default.T’, tag =>‘my_tag’)
rollback回滚到目标表的特定版本。参数: table:目标表标识符,不能为空。 version:要回滚到的快照ID或标签名称。CALL sys.rollback(table => ‘default.T’, version =>‘my_tag’) CALL sys.rollback(table => ‘default.T’, version => 10)
migrate_table将Hive表迁移到Paimon表。参数: source_type:要迁移的源表类型,如hive,不能为空。 table:要迁移的源表名称,不能为空。 options:要迁移到的Paimon表的表选项。 target_table:要迁移到的目标Paimon表的名称。如果未设置,则与源表名称相同。 delete_origin:如果设置了target_table,可以设置delete_origin来决定迁移后是否从HMS(Hive元存储)中删除源表元数据,默认值为true。 options_map:用于添加键值对选项的选项映射,是一个映射。CALL sys.migrate_table(source_type => ‘hive’, table => ‘default.T’, options => ‘file.format=parquet’, options_map => map(‘k1’,‘v1’))
migrate_file从Hive表迁移到Paimon表。参数: source_type:要迁移的源表类型,如hive,不能为空。 source_table:要迁移的源表名称,不能为空。 target_table:要迁移到的目标表名称,不能为空。 delete_origin:如果设置了target_table,可以设置delete_origin来决定迁移后是否从Hive元存储中删除源表元数据,默认值为true。CALL sys.migrate_file(source_type => ‘hive’, table => ‘default.T’, delete_origin => true)
remove_orphan_files删除孤立的数据文件和元数据文件。参数: table:目标表标识符,不能为空,你可以使用database_name.* 来清理整个数据库。 older_than:为避免删除新写入的文件,此过程默认仅删除超过1天的孤立文件。此参数可修改时间间隔。 dry_run:为true时,仅查看孤立文件,不实际删除文件,默认值为false。CALL sys.remove_orphan_files(table => ‘default.T’, older_than => ‘2023-10-31 12:00:00’) CALL sys.remove_orphan_files(table => ‘default.*’, older_than => ‘2023-10-31 12:00:00’) CALL sys.remove_orphan_files(table => ‘default.T’, older_than => ‘2023-10-31 12:00:00’, dry_run => true)
repair将文件系统中的信息同步到元存储。参数: database_or_table:空或目标数据库名称或目标表标识符,如果指定多个标签,分隔符为 ‘,’CALL sys.repair(’test_db.T’) CALL sys.repair(’test_db.T,test_db01,test_db.T2’)
create_branch将一个分支合并到主分支。参数: table:目标表标识符,不能为空。 branch:要合并的分支名称。 tag:新标签的名称,不能为空。CALL sys.create_branch(table => ’test_db.T’, branch => ’test_branch’) CALL sys.create_branch(table => ’test_db.T’, branch => ’test_branch’, tag =>‘my_tag’)
delete_branch删除一个分支。参数: table:目标表标识符,不能为空。 branch:要删除的分支名称。如果指定多个分支,分隔符为 ‘,’。CALL sys.delete_branch(table => ’test_db.T’, branch => ’test_branch’)
fast_forward将一个分支快速推进到主分支。参数: table:目标表标识符,不能为空。 branch:要合并的分支名称。CALL sys.fast_forward(table => ’test_db.T’, branch => ’test_branch’)
reset_consumer重置或删除消费者。参数: identifier:目标表标识符,不能为空。 consumerId:要重置或删除的消费者。 nextSnapshotId (Long):消费者的新的下一个快照ID。– 重置消费者中的新的下一个快照ID CALL sys.reset_consumer(table => ‘default.T’, consumerId =>‘myid’, nextSnapshotId => 10) – 删除消费者 CALL sys.reset_consumer(table => ‘default.T’, consumerId =>‘myid’)
mark_partition_done标记分区已完成。参数: table:目标表标识符,不能为空。 partitions:需要标记为已完成的分区,如果指定多个分区,分隔符为 ‘;’。– 标记单个分区已完成 CALL sys.mark_partition_done(table => ‘default.T’, parititions => ‘day=2024-07-01’) – 标记多个分区已完成 CALL sys.mark_partition_done(table => ‘default.T’, parititions => ‘day=2024-07-01;day=2024-07-02’)

拓展:

  • 存储过程的作用:这些Spark存储过程为Paimon表的管理和操作提供了强大的功能集合。例如,compact 存储过程通过压缩文件优化存储,不同的参数设置可以满足对特定分区、不同排序策略下的压缩需求,还能针对长时间未更新的分区进行清理压缩,有助于提升存储效率和查询性能。expire_snapshotsexpire_partitions 分别对快照和分区进行过期管理,这在数据生命周期管理中非常重要,可以自动清理不再需要的数据,释放存储空间。
  • 数据迁移与维护migrate_tablemigrate_file 实现了从Hive表到Paimon表的迁移,为不同存储系统间的数据转换提供了便利。remove_orphan_files 则专注于清理孤立文件,确保数据存储的整洁,避免无用文件占用空间。repair 过程用于同步文件系统和元存储的信息,保证数据的一致性和可访问性。
  • 版本与分支管理create_tagcreate_tag_from_timestampdelete_tagrollback 等存储过程构成了版本管理体系,允许用户创建、删除标签,并基于标签或快照ID进行回滚操作,方便对数据版本进行灵活控制。而 create_branchdelete_branchfast_forward 则提供了分支管理功能,类似于版本控制系统中的分支操作,有助于在数据处理过程中进行并行开发、测试等操作,然后再合并回主分支。
  • 其他操作reset_consumer 针对消费者相关信息进行重置或删除,在涉及数据消费和处理流程的场景中,能够灵活调整消费者状态。mark_partition_done 为分区标记完成状态,这在一些需要跟踪分区处理进度的复杂数据处理任务中非常有用。这些存储过程共同构成了一个全面的Paimon数据管理工具集,满足了各种数据管理和处理的需求。