SelectDB Cloud
开发指南
生态集成
Flink SelectDB Connector

Flink SelectDB Connector

Flink SelectDB Connector 支持通过Flink DataStream API 和 FlinkSQL 将上游的数据写入到SelectDB中。

Connector v2.0.0

版本支持

Flink 1.13-1.17

使用

Maven引用

<dependency>
    <groupId>com.selectdb</groupId>
    <artifactId>flink-selectdb-connector-1.16</artifactId>
    <version>2.0.0</version>
</dependency>

更多版本可参考这里 (opens in a new tab)

Runtime Jar

也可在这里 (opens in a new tab)直接下载jar包,放入FLINK_HOME/lib 下即可使用。

使用方法

FlinkSQL

-- 开启checkpoint
SET 'execution.checkpointing.interval' = '30s';
-- 禁用chain(目的是将Sink的Writer和Commiter拆开,可以异步执行导入)
SET 'pipeline.operator-chaining' = 'false';
CREATE TABLE selectdb_sink (
name STRING,
age INT
) 
WITH (
  'connector' = 'selectdb',
  'load-url' = 'xxx.privatelink.aliyun.com:47057',
  'jdbc-url' = 'xxx.privatelink.aliyun.com:30523',
  'cluster-name' = 'clustername',
  'table.identifier' = 'database.table',
  'username' = 'admin',
  'password' = '',
  -- csv 写入
  -- 'sink.properties.file.column_separator' = '\x01',
  -- 'sink.properties.file.line_delimiter' = '\x02',
  -- json 写入
  'sink.properties.file.type' = 'json',
  'sink.properties.file.strip_outer_array' = 'false',
  -- 同步delete事件
  'sink.enable-delete' = 'false'
);

DataStream

// enable checkpoint
env.enableCheckpointing(10000);
 
SelectdbSink.Builder<String> builder = SelectdbSink.builder();
SelectdbOptions.Builder selectdbBuilder = SelectdbOptions.builder();
selectdbBuilder.setLoadUrl("ip:httpPort")
        .setJdbcUrl("ip:jdbcPort")
        .setClusterName("clustername")
        .setTableIdentifier("db.table")
        .setUsername("root")
        .setPassword("password");
 
Properties properties = new Properties();
//csv写入
properties.setProperty("file.column_separator", ",");
properties.setProperty("file.line_delimiter", "\n");
properties.setProperty("file.type", "csv");
//json写入
//properties.setProperty("file.strip_outer_array", "false");
//properties.setProperty("file.type", "json");
 
// 从 kafka 或其他数据质量符合规范的数据源同步
SelectdbExecutionOptions.Builder  executionBuilder = SelectdbExecutionOptions.builder();
executionBuilder.setLoadProps(properties); 
 
builder.setSelectdbExecutionOptions(executionBuilder.build())
        .setSerializer(new SimpleStringSerializer()) //serialize according to string 
        .setSelectdbOptions(selectdbBuilder.build());
 
// 从 cdc 数据源同步
//SelectdbExecutionOptions.Builder  executionBuilder = SelectdbExecutionOptions.builder();
//executionBuilder.setLoadProps(properties).setDeletable(true);
//
//builder.setSelectdbExecutionOptions(executionBuilder.build())
//        .setSerializer(JsonDebeziumSchemaSerializer.builder().setSelectdbOptions(selectdbBuilder.build()).build())
//        .setSelectdbOptions(selectdbBuilder.build());
 
//mock csv string source
List<Tuple2<String, Integer>> data = new ArrayList<>();
data.add(new Tuple2<>("selectdb",1));
DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(data);
 
source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "," + t.f1)
      .sinkTo(builder.build())
      .disableChaining();//将Sink的Writer和Commiter算子拆开,可以异步执行导入。

配置项

KeyDefault ValueRequiredDescription
load-url-YSelectdb 导入 url,例:连接地址:httpPort
jdbc-url-YSelectdb 查询 url,例:连接地址:jdbcPort
cluster-name-Yselectdb 集群名称,仓库下可能有多个集群,按需选择使用
table.identifier-Y写入的表,例:db.table
username-Y用户名
password-Y密码
sink.properties-Y写入属性配置 CSV写入:sink.properties.file.type='csv' sink.properties.file.column_separator=',' sink.properties.file.line_delimiter='\n' JSON写入: sink.properties.file.type='json' sink.properties.file.strip_outer_array='false'
sink.buffer-size5242880 (5MB)N缓存的最大容量,单位字节,超过会flush到对象存储上,默认5MB。
sink.buffer-count10000N缓存的最大条数,超过会flush到对象存储上,默认10000。
sink.max-retries3NCommit阶段(Copy Into执行)的最大重试次数,默认3次
sink.enable-deletefalseN是否同步删除事件,默认false
sink.flush.queue-size1N上传对象存储时,缓冲队列的大小。

使用FlinkCDC接入多表或整库示例

语法

<FLINK_HOME>bin/flink run \
    -c com.selectdb.flink.tools.cdc.CdcTools \
    lib/flink-selectdb-connector-1.17-2.1.0-SNAPSHOT.jar \
    <mysql-sync-database|oracle-sync-database|postgres-sync-database|sqlserver-sync-database> \
    --database <selectdb-database-name> \
    [--job-name <flink-job-name>] \
    [--table-prefix <selectdb-table-prefix>] \
    [--table-suffix <selectdb-table-suffix>] \
    [--including-tables <mysql-table-name|name-regular-expr>] \
    [--excluding-tables <mysql-table-name|name-regular-expr>] \
    --mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...] \
    --oracle-conf <oracle-cdc-source-conf> [--oracle-conf <oracle-cdc-source-conf> ...] \
    --sink-conf <selectdb-sink-conf> [--table-conf <selectdb-sink-conf> ...] \
    [--table-conf <selectdb-table-conf> [--table-conf <selectdb-table-conf> ...]]
  • --job-name Flink任务名称, 非必需。
  • --database 同步到SelecteDB的数据库名。
  • --table-prefix SelecteDB表前缀名,例如 --table-prefix ods_。
  • --table-suffix 同上,SelecteDB表的后缀名。
  • --including-tables 需要同步的MySQL表,可以使用"|" 分隔多个表,并支持正则表达式。 比如--including-tables table1|tbl.*就是同步table1和所有以tbl开头的表。
  • --excluding-tables 不需要同步的表,用法同上。
  • --mysql-conf MySQL CDCSource 配置,例如--mysql-conf hostname=127.0.0.1 ,您可以在这里 (opens in a new tab)查看所有配置MySQL-CDC,其中hostname/username/password/database-name 是必需的。
  • --oracle-conf Oracle CDCSource 配置,例如--oracle-conf hostname=127.0.0.1 ,您可以在这里 (opens in a new tab)查看所有配置Oracle-CDC,其中hostname/username/password/database-name/schema-name 是必需的。
  • --sink-conf SelectDB Sink 的所有配置,可以在上文中查看完整的配置项。
  • --table-conf SelectDB表的配置项,即properties中包含的内容。

注:

  1. 同步时需要在$FLINK_HOME/lib 目录下添加对应的Flink CDC依赖,比如 flink-sql-connector-mysql-cdc-${version}.jar,flink-sql-connector-oracle-cdc-${version}.jar

  2. 整库同步支持Flink1.15以上的版本,Connector Snapshot包下载地址可参考这里 (opens in a new tab)

MySQL同步示例

<FLINK_HOME>bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c com.selectdb.flink.tools.cdc.CdcTools \
    lib/flink-selectdb-connector-1.17-2.1.0-SNAPSHOT.jar \
    mysql-sync-database \
    --database test_db \
    --mysql-conf hostname=127.0.0.1 \
    --mysql-conf username=root \
    --mysql-conf password=123456 \
    --mysql-conf database-name=mysql_db \
    --including-tables "tbl1|test.*" \
    --sink-conf load-url=xxx.privatelink.aliyun.com:47057 \
    --sink-conf jdbc-url=xxx.privatelink.aliyun.com:21477 \
    --sink-conf username=admin \
    --sink-conf password=123456 \
    --sink-conf cluster-name=Cluster01

Oracle同步示例

<FLINK_HOME>bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c com.selectdb.flink.tools.cdc.CdcTools \
    lib/flink-selectdb-connector-1.17-2.1.0-SNAPSHOT.jar \
    oracle-sync-database \
    --database test_db \
    --oracle-conf hostname=127.0.0.1 \
    --oracle-conf port=1521 \
    --oracle-conf username=admin \
    --oracle-conf password="password" \
    --oracle-conf database-name=XE \
    --oracle-conf schema-name=ADMIN \
    --including-tables "tbl1|test.*" \
    --sink-conf load-url=xxx.privatelink.aliyun.com:47057 \
    --sink-conf jdbc-url=xxx.privatelink.aliyun.com:21477 \
    --sink-conf username=admin \
    --sink-conf password=123456 \
    --sink-conf cluster-name=Cluster01

PostgreSQL同步示例

<FLINK_HOME>bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c com.selectdb.flink.tools.cdc.CdcTools \
    lib/flink-selectdb-connector-1.17-2.1.0-SNAPSHOT.jar \
    postgres-sync-database \
    --database db1\
    --postgres-conf hostname=127.0.0.1 \
    --postgres-conf port=5432 \
    --postgres-conf username=postgres \
    --postgres-conf password="123456" \
    --postgres-conf database-name=postgres \
    --postgres-conf schema-name=public \
    --postgres-conf slot.name=test \
    --postgres-conf decoding.plugin.name=pgoutput \
    --including-tables "tbl1|test.*" \
    --sink-conf load-url=xxx.privatelink.aliyun.com:47057 \
    --sink-conf jdbc-url=xxx.privatelink.aliyun.com:21477 \
    --sink-conf username=admin \
    --sink-conf password=123456 \
    --sink-conf cluster-name=Cluster01

SQLServer同步示例

<FLINK_HOME>bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c com.selectdb.flink.tools.cdc.CdcTools \
    lib/flink-selectdb-connector-1.17-2.1.0-SNAPSHOT.jar \
    sqlserver-sync-database \
    --database db1\
    --sqlserver-conf hostname=127.0.0.1 \
    --sqlserver-conf port=1433 \
    --sqlserver-conf username=sa \
    --sqlserver-conf password="123456" \
    --sqlserver-conf database-name=CDC_DB \
    --sqlserver-conf schema-name=dbo \
    --including-tables "tbl1|test.*" \
    --sink-conf load-url=xxx.privatelink.aliyun.com:47057 \
    --sink-conf jdbc-url=xxx.privatelink.aliyun.com:21477 \
    --sink-conf username=admin \
    --sink-conf password=123456 \
    --sink-conf cluster-name=Cluster01

Connector v1.0.0

版本支持

FlinkJavaRuntime Jar
1.138scala2.11 flink-selectdb-connector-1.13_2.11-1.0.0 (opens in a new tab) scala2.12 flink-selectdb-connector-1.13_2.12-1.0.0 (opens in a new tab)
1.148scala2.11 flink-selectdb-connector-1.14_2.11-1.0.0 (opens in a new tab) scala2.12 flink-selectdb-connector-1.14_2.12-1.0.0 (opens in a new tab)
1.15,1.16,1.178flink-selectdb-connector-1.0.0 (opens in a new tab)

使用方法

Flink 1.13

FlinkSQL
CREATE TABLE selectdb_sink (
name STRING,
age INT
) 
WITH (
  'connector' = 'selectdb',
  'load-url' = 'xxx.privatelink.aliyun.com:47057',
  'jdbc-url' = 'xxx.privatelink.aliyun.com:30523',
  'cluster-name' = 'clustername',
  'table.identifier' = 'database.table',
  'username' = 'admin',
  'password' = '',
  'sink.properties.file.type' = 'json',
  'sink.properties.file.strip_outer_array' = 'true'
);
DataStream
Properties pro = new Properties();
pro.setProperty("file.type", "json");
pro.setProperty("file.strip_outer_array", "true");
env.fromElements("{\"name\": \"zhangsan\", \"age\": \"1\"}")
   .addSink(
           DorisSink.sink(
                   DorisExecutionOptions.builder()
                           .setStreamLoadProp(pro).build(),
                   DorisOptions.builder()
                           .setLoadUrl("xxx.privatelink.aliyun.com:47057")
                           .setJdbcUrl("xxx.privatelink.aliyun.com:30523")
                           .setClusterName("clustername")
                           .setTableIdentifier("database.tablename")
                           .setUsername("admin")
                           .setPassword("").build()
           ));

Flink1.14+

FlinkSQL
-- 开启checkpoint
SET 'execution.checkpointing.interval' = '30s';
-- 禁用chain(目的是将Sink的Writer和Commiter拆开,可以异步执行导入)
SET 'pipeline.operator-chaining' = 'false';
CREATE TABLE selectdb_sink (
name STRING,
age INT
) 
WITH (
  'connector' = 'selectdb',
  'load-url' = 'xxx.privatelink.aliyun.com:47057',
  'jdbc-url' = 'xxx.privatelink.aliyun.com:30523',
  'cluster-name' = 'clustername',
  'table.identifier' = 'database.table',
  'username' = 'admin',
  'password' = '',
  -- csv 写入
  -- 'sink.properties.file.column_separator' = '\x01',
  -- 'sink.properties.file.line_delimiter' = '\x02',
  -- json 写入
  'sink.properties.file.type' = 'json',
  'sink.properties.file.strip_outer_array' = 'false',
  -- 同步delete事件
  'sink.enable-delete' = 'true'
);
DataStream
// enable checkpoint
env.enableCheckpointing(30000);
 
DorisSink.Builder<String> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setLoadUrl("ip:httpPort")
        .setJdbcUrl("ip:jdbcPort")
        .setClusterName("clustername")
        .setTableIdentifier("db.table")
        .setUsername("root")
        .setPassword("password");
 
Properties properties = new Properties();
//csv写入
properties.setProperty("file.column_separator", ",");
properties.setProperty("file.line_delimiter", "\n");
properties.setProperty("file.type", "csv");
//json写入
//properties.setProperty("file.strip_outer_array", "false");
//properties.setProperty("file.type", "json");
 
// 从 kafka 或其他数据质量符合规范的数据源同步
DorisExecutionOptions.Builder  executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setStreamLoadProp(properties); 
 
builder.setDorisExecutionOptions(executionBuilder.build())
        .setSerializer(new SimpleStringSerializer()) //serialize according to string 
        .setDorisOptions(dorisBuilder.build());
 
// 从 cdc 数据源同步
//DorisExecutionOptions.Builder  executionBuilder = DorisExecutionOptions.builder();
//executionBuilder.setStreamLoadProp(properties).setDeletable(true);
//
//builder.setDorisExecutionOptions(executionBuilder.build())
//        .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisBuilder.build()).build())
//        .setDorisOptions(dorisBuilder.build());
 
//mock csv string source
List<Tuple2<String, Integer>> data = new ArrayList<>();
data.add(new Tuple2<>("doris",1));
DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(data);
 
source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "," + t.f1)
      .sinkTo(builder.build())
      .disableChaining();//将Sink的Writer和Commiter算子拆开,可以异步执行导入。

配置项

KeyDefault ValueRequiredDescription
load-url-YSelectdb 导入 url,例:连接地址:httpPort
jdbc-url-YSelectdb 查询 url,例:连接地址:jdbcPort
cluster-name-Yselectdb 集群名称
table.identifier-Y写入的表,例:db.table
username-Y用户名
password-Y密码
sink.properties-Y写入属性配置 CSV写入:sink.properties.file.type='csv' sink.properties.file.column_separator=',' sink.properties.file.line_delimiter='\n' JSON写入: sink.properties.file.type='json' sink.properties.file.strip_outer_array='false' (Flink1.13中为true)
sink.enable-deletefalseN是否同步删除事件,默认false

Flink1.13参数

KeyDefault ValueRequiredDescription
sink.batch.size10000N单次flush的最大行数
sink.max-retries3Nflush失败后的重试次数
sink.batch.interval10sNflush的间隔时间,默认值10秒。支持时间单位ms/s/min/h/d(默认毫秒)。设置为0表示关闭定期写入。
sink.batch.bytes10485760(10MB)Nflush的最大字节数,单位字节,默认10MB

Flink1.14+参数

KeyDefault ValueRequiredDescription
sink.buffer-size1024*1024 (1MB)N写数据缓存buffer大小,单位字节。默认1MB,不建议修改。
sink.buffer-count3N写数据缓存buffer个数,默认3,不建议修改。
sink.max-retries1NCommit阶段的最大重试次数,默认1次
sink.check-interval10000N定期写文件的间隔,单位毫秒,默认10秒,不建议修改。
© 2023 北京飞轮数据科技有限公司 京ICP备2022004029号 | Apache、Apache Doris 以及相关开源项目名称均为 Apache 基金会商标