SelectDB Enterprise
使用指南
湖仓一体
湖仓一体最佳实践
Doris 集成 AWS EMR

Doris 集成 AWS EMR

本文主要介绍如何通过 Doris 快速集成 Amazon EMR (opens in a new tab),并以访问 Hive 为例,展示和 AWS Glue Data Catalog (opens in a new tab) 以及和 Amazon S3 (opens in a new tab) 的集成。

环境准备

01 创建 Amazon EMR 集群

首先,您需要拥有一个 AWS EMR 集群,并包含必要的组件如 Hive、Hadoop。可以选择是否使用 AWS Glue Data Catalog。Doris 支持访问 Glue 元数据服务,或默认的 Hive Metastore 元数据服务。

02 确认网络安全组

按需修改 EMR Master/Core/Task 节点的网络安全组入站规则,确保 Doris 所在节点,允许访问对应服务的端口,如 Hive Metastore 的 Thrift 端口,HDFS 的 Namenode 以及 Datanode 端口等。

集成 EMR

这里我们以访问 EMR Hive 为例,介绍 Doris 和 EMR 的集成方式。

01 创建 Hive Catalog

这是一个基础的 Hive Catalog 创建示例:

CREATE CATALOG hive_catalog PROPERTIES (
    'type' = 'hms',
    'hive.metastore.uris' = 'thrift://<EMR_MASTER_IP>:9083'
);

如果 Hive 中有数据存储在 S3 上,则需添加 S3 连接认证信息:

CREATE CATALOG hive_catalog_s3 PROPERTIES (
    'type' = 'hms',
    'hive.metastore.uris' = 'thrift://<EMR_MASTER_IP>:9083',
    's3.endpoint' = 's3.us-east-1.amazonaws.com',
    's3.region' = 'us-east-1',
    's3.access_key' = '<YOUR_ACCESS_KEY>',
    's3.secret_key' = '<YOUR_SECRET_KEY>'
);

如果使用了 Glue Data Catalog,则需添加 Glue 连接认证信息:

CREATE CATALOG glue_catalog PROPERTIES (
    'type' = 'hms',
    'hive.metastore.type' = 'glue',
    'glue.endpoint' = 'https://glue.us-east-1.amazonaws.com',
    'glue.access_key' = '<YOUR_ACCESS_KEY>',
    'glue.secret_key' = '<YOUR_SECRET_KEY>'
);

S3 Table Bucket 是 S3 推出的第三种 Bucket 类型,和之前的 General purpose bucket 以及 Directory bucket 平级。

AWS S3 Table Bucket

这里我们创建一个名为 doris-s3-table-bucket 的 Table Bucket。创建后我们将得到一个 ARN 表示的 Table Bucket

AWS S3 Table Bucket Create

02 创建 Iceberg Catalog

创建一个 s3tables 类型的 Iceberg Catalog

CREATE CATALOG iceberg_s3 PROPERTIES (
    'type' = 'iceberg',
    'iceberg.catalog.type' = 's3tables',
    'warehouse' = 'arn:aws:s3tables:us-east-1:169698000000:bucket/doris-s3-table-bucket',
    's3.region' = 'us-east-1',
    's3.endpoint' = 's3.us-east-1.amazonaws.com',
    's3.access_key' = 'AKIASPAWQE3ITEXAMPLE',
    's3.secret_key' = 'l4rVnn3hCmwEXAMPLE/lht4rMIfbhVfEXAMPLE'
);

03 访问 S3Tables

Doris > SWITCH iceberg_s3;
 
Doris > SHOW DATABASES;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| my_namespace       |
| mysql              |
+--------------------+
 
Doris > USE my_namespace;
 
Doris > SHOW TABLES;
+------------------------+
| Tables_in_my_namespace |
+------------------------+
| my_table               |
+------------------------+
 
Doris > SELECT * FROM my_table;
+------+------+-------+
| id   | name | value |
+------+------+-------+
|    1 | ABC  |   100 |
|    2 | XYZ  |   200 |
+------+------+-------+

04 创建 S3Tables 表并写入数据

Doris > CREATE TABLE partition_table (
    ->   `ts` DATETIME COMMENT 'ts',
    ->   `id` INT COMMENT 'col1',
    ->   `pt1` STRING COMMENT 'pt1',
    ->   `pt2` STRING COMMENT 'pt2'
    -> )
    -> PARTITION BY LIST (day(ts), pt1, pt2) ();
 
Doris > INSERT INTO partition_table VALUES
    -> ("2024-01-01 08:00:00", 1000, "us-east", "PART1"),
    -> ("2024-01-02 10:00:00", 1002, "us-sout", "PART2");
Query OK, 2 rows affected
{'status':'COMMITTED', 'txnId':'1736935786473'}
 
Doris > SELECT * FROM partition_table;
+----------------------------+------+---------+-------+
| ts                         | id   | pt1     | pt2   |
+----------------------------+------+---------+-------+
| 2024-01-02 10:00:00.000000 | 1002 | us-sout | PART2 |
| 2024-01-01 08:00:00.000000 | 1000 | us-east | PART1 |
+----------------------------+------+---------+-------+

05 Time Travel

我们可以再插入一批数据,然后使用 iceberg_meta() 函数查看 Iceberg 的 Snapshots:

Doris > INSERT INTO partition_table VALUES
    -> ("2024-01-03 08:00:00", 1000, "us-east", "PART1"),
    -> ("2024-01-04 10:00:00", 1002, "us-sout", "PART2");
Query OK, 2 rows affected (9.76 sec)
{'status':'COMMITTED', 'txnId':'1736935786474'}
Doris > SELECT * FROM iceberg_meta(
    ->     'table' = 'iceberg_s3.my_namespace.partition_table',
    ->     'query_type' = 'snapshots'
    -> )\G
*************************** 1. row ***************************
 committed_at: 2025-01-15 23:27:01
  snapshot_id: 6834769222601914216
    parent_id: -1
    operation: append
manifest_list: s3://80afcb3f-6edf-46f2-7fhehwj6cengfwc7n6iz7ipzakd7quse1b--table-s3/metadata/snap-6834769222601914216-1-a6b2230d-fc0d-4c1d-8f20-94bb798f27b1.avro
      summary: {"added-data-files":"2","added-records":"2","added-files-size":"5152","changed-partition-count":"2","total-records":"2","total-files-size":"5152","total-data-files":"2","total-delete-files":"0","total-position-deletes":"0","total-equality-deletes":"0","iceberg-version":"Apache Iceberg 1.6.1 (commit 8e9d59d299be42b0bca9461457cd1e95dbaad086)"}
*************************** 2. row ***************************
 committed_at: 2025-01-15 23:30:00
  snapshot_id: 5670090782912867298
    parent_id: 6834769222601914216
    operation: append
manifest_list: s3://80afcb3f-6edf-46f2-7fhehwj6cengfwc7n6iz7ipzakd7quse1b--table-s3/metadata/snap-5670090782912867298-1-beeed339-be96-4710-858b-f39bb01cc3ff.avro
      summary: {"added-data-files":"2","added-records":"2","added-files-size":"5152","changed-partition-count":"2","total-records":"4","total-files-size":"10304","total-data-files":"4","total-delete-files":"0","total-position-deletes":"0","total-equality-deletes":"0","iceberg-version":"Apache Iceberg 1.6.1 (commit 8e9d59d299be42b0bca9461457cd1e95dbaad086)"}

使用 VERSION AS OF 语法查询不同的快照:

Doris > SELECT * FROM partition_table FOR VERSION AS OF 5670090782912867298;
+----------------------------+------+---------+-------+
| ts                         | id   | pt1     | pt2   |
+----------------------------+------+---------+-------+
| 2024-01-04 10:00:00.000000 | 1002 | us-sout | PART2 |
| 2024-01-03 08:00:00.000000 | 1000 | us-east | PART1 |
| 2024-01-01 08:00:00.000000 | 1000 | us-east | PART1 |
| 2024-01-02 10:00:00.000000 | 1002 | us-sout | PART2 |
+----------------------------+------+---------+-------+
 
Doris > SELECT * FROM partition_table FOR VERSION AS OF 6834769222601914216;
+----------------------------+------+---------+-------+
| ts                         | id   | pt1     | pt2   |
+----------------------------+------+---------+-------+
| 2024-01-02 10:00:00.000000 | 1002 | us-sout | PART2 |
| 2024-01-01 08:00:00.000000 | 1000 | us-east | PART1 |
+----------------------------+------+---------+-------+

06 使用 EMR Spark 访问 S3 Tables

使用 Doris 写入的数据,也可以使用 Spark 进行访问:

spark-shell --jars /usr/share/aws/iceberg/lib//iceberg-spark-runtime-3.5_2.12-1.6.1-amzn-1.jar \
--packages software.amazon.s3tables:s3-tables-catalog-for-iceberg-runtime:0.1.3 \
--conf spark.sql.catalog.s3tablesbucket=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.s3tablesbucket.catalog-impl=software.amazon.s3tables.iceberg.S3TablesCatalog \
--conf spark.sql.catalog.s3tablesbucket.warehouse=arn:aws:s3tables:us-east-1:169698000000:bucket/doris-s3-table-bucket \
--conf spark.sql.defaultCatalog=s3tablesbucket \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
scala> spark.sql("SELECT * FROM s3tablesbucket.my_namespace.`partition_table` ").show()
+-------------------+----+-------+-----+
|                 ts|  id|    pt1|  pt2|
+-------------------+----+-------+-----+
|2024-01-02 10:00:00|1002|us-sout|PART2|
|2024-01-01 08:00:00|1000|us-east|PART1|
|2024-01-04 10:00:00|1002|us-sout|PART2|
|2024-01-03 08:00:00|1000|us-east|PART1|
+-------------------+----+-------+-----+
© 2025 北京飞轮数据科技有限公司 京ICP备2022004029号 | Apache、Apache Doris 以及相关开源项目名称均为 Apache 基金会商标