SelectDB Cloud
开发指南
生态集成
Spark SelectDB Connector

Spark SelectDB Connector

快速介绍

Spark Selectdb Connector支持将上游的大数据量写入到SelectDB Cloud中。

实现原理

Spark SelectDB Connector 底层实现依赖于SelectDB Cloud的stage导入方式,通过调用SelectDB Cloud api (/copy/upload),返回一个重定向的对象存储地址,使用http的方式向对象存储地址发送字节流,最后通过copy into(/copyinto)的方式将对象存储桶中的数据导入到SelectDB Cloud中。Stage导入方式的具体使用可以参阅官网Stage导入 - 云原生实时数据仓库

版本支持

Spark2.3/3.1/3.2

使用

Maven引用

<dependency>
    <groupId>com.selectdb</groupId>
    <artifactId>spark-selectdb-connector-3.2_2.12</artifactId>
    <version>1.0.0</version>
</dependency>

更多版本可参考这里 (opens in a new tab)

Runtime Jar

也可在这里 (opens in a new tab)直接下载jar包。将下载的jar包复制到 SparkClassPath 中即可使用 spark-selectdb-connector。例如,Local 模式运行的 Spark,将此文件放入 jars/ 文件夹下。Yarn集群模式运行的Spark,则将此文件放入预部署包中。例如将 spark-selectdb-connector-2.3.4-2.11-1.0.0.jar 上传到 hdfs并在spark.yarn.jars参数上添加 hdfs上的Jar包路径

  • 上传spark-selectdb-connector-2.3.4-2.11-1.0.0.jar 到hdfs。
hdfs dfs -mkdir /spark-jars/
hdfs dfs -put /your_local_path/spark-selectdb-connector-2.3.4-2.11-1.0.0.jar /spark-jars/
  • 在集群中添加spark-selectdb-connector-3.1.2-2.12-1.0.0.jar 依赖。
spark.yarn.jars=hdfs:///spark-jars/doris-spark-connector-3.1.2-2.12-1.0.0.jar

使用示例

通过sparksql的方式写入

val selectdbHttpPort = "127.0.0.1:47968"
val selectdbJdbc = "jdbc:mysql://127.0.0.1:18836/test"
val selectdbUser = "admin"
val selectdbPwd = "selectdb2022"
val selectdbTable = "test.test_order"
  
CREATE TEMPORARY VIEW test_order
USING selectdb
OPTIONS(
 "table.identifier"="test.test_order",
 "jdbc.url"="${selectdbJdbc}",
 "http.port"="${selectdbHttpPort}",
 "user"="${selectdbUser}",
 "password"="${selectdbPwd}",
 "sink.properties.file.type"="json"
);
 
insert into test_order select  order_id,order_amount,order_status from tmp_tb ;

通过DataFrame方式写入

val spark = SparkSession.builder().master("local[1]").getOrCreate()
val df = spark.createDataFrame(Seq(
  ("1", 100, "待付款"),
  ("2", 200, null),
  ("3", 300, "已收货")
)).toDF("order_id", "order_amount", "order_status")
 
df.write
  .format("selectdb")
  .option("selectdb.http.port", selectdbHttpPort)
  .option("selectdb.table.identifier", selectdbTable)
  .option("user", selectdbUser)
  .option("password", selectdbPwd)
  .option("sink.batch.size", 4)
  .option("sink.max-retries", 2)
  .option("sink.properties.file.column_separator", "\t")
  .option("sink.properties.file.line_delimiter", "\n")
  .save()

配置

KeyDefaultValueCommentRequired
selectdb.http.port--selectdb cloud http地址Y
selectdb.jdbc.url--selectdb cloud jdbc地址,此配置为spark sql所属Y
selectdb.table.identifier--selectdb cloud表名,格式 库名.表名,例如:db1.tbl1Y
user--访问selectdb cloud的用户名Y
password--访问selectdb cloud的密码Y
sink.batch.size100000单次写selectdb cloud的最大行数N
sink.max-retries3写selectdb失败之后的重试次数N
sink.properties.*--copy into的导入参数。例如:"sink.properties.file.type"="json"关于copy into更多的参数说明,请参阅selectdb官网copy into章节N
© 2023 北京飞轮数据科技有限公司 京ICP备2022004029号 | Apache、Apache Doris 以及相关开源项目名称均为 Apache 基金会商标