Doris Kafka Connector
Kafka Connect (opens in a new tab) 是一款可扩展、可靠的在 Apache Kafka 和其他系统之间进行数据传输的工具,可以定义 Connectors 将大量数据迁入迁出 Kafka。
Doris 社区提供的 Sink Connector 插件,可以将 Kafka topic 中的数据写入到 SelectDB Cloud 中。
下载方式
• 采用 Maven 时,引入依赖的方式如下所示。
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>doris-kafka-connector</artifactId>
<version>1.0.0</version>
</dependency>
• 手动下载 Doris-Kafka-Connector (opens in a new tab),并将 JAR 包放到 $KAFKA_HOME/libs 目录下。
Doris Kafka Connector 使用
Standalone 模式启动
配置 connect-standalone.properties
# 修改 broker 地址
bootstrap.servers=127.0.0.1:9092
配置 doris-connector-sink.properties 在 config 目录下创建 doris-connector-sink.properties,并配置如下内容:
name=test-doris-sink
connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
topics=topic_test
doris.topic2table.map=topic_test:test_kafka_tbl
buffer.count.records=10000
buffer.flush.time=120
buffer.size.bytes=5000000
doris.urls=${selectdb.url}
doris.http.port=${selectdb.http.port}
doris.query.port=${selectdb.query.port}
doris.user=${selectdb.user}
doris.database=${selectdb.database}
doris.password=${selectdb.password}
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
启动 Standalone
$KAFKA_HOME/bin/connect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/doris-connector-sink.properties
:::note 注意:一般不建议在生产环境中使用 standalone 模式 :::
Distributed 模式启动
配置 connect-distributed.properties
# 修改 broker 地址
bootstrap.servers=127.0.0.1:9092
# 修改 group.id,同一集群的需要一致
group.id=connect-cluster
启动 Distributed
$KAFKA_HOME/bin/connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties
增加 Connector
curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
"name":"test-selectdb-sink-cluster",
"config":{
"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
"topics":"topic_test",
"doris.topic2table.map": "topic_test:test_kafka_tbl",
"buffer.count.records":"10000",
"buffer.flush.time":"120",
"buffer.size.bytes":"5000000",
"doris.urls":"${selectdb.url}",
"doris.user":"${selectdb.user}",
"doris.database":"${selectdb.database}",
"doris.password":"${selectdb.password}",
"doris.http.port":"${selectdb.http.port}",
"doris.query.port":"${selectdb.query.port}",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter"
}
}'
操作 Connector
# 查看 connector 状态
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/status -X GET
# 删除当前 connector
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster -X DELETE
# 暂停当前 connector
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/pause -X PUT
# 重启当前 connector
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/resume -X PUT
# 重启 connector 内的 tasks
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/tasks/0/restart -X POST
参考:Connect REST Interface (opens in a new tab)
:::note
注意 kafka-connect 首次启动时,会往 kafka 集群中创建 config.storage.topic
offset.storage.topic
status.storage.topic
三个 topic 用于记录 kafka-connect 的共享连接器配置、偏移数据和状态更新。How to Use Kafka Connect - Get Started (opens in a new tab)
:::
访问 SSL 认证的 Kafka 集群
通过 kafka-connect 访问 SSL 认证的 Kafka 集群需要用户提供用于认证 Kafka Broker 公钥的证书文件(client.truststore.jks)。您可以在 connect-distributed.properties
文件中增加以下配置:
# Connect worker
security.protocol=SSL
ssl.truststore.location=/var/ssl/private/client.truststore.jks
ssl.truststore.password=test1234
# Embedded consumer for sink connectors
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/var/ssl/private/client.truststore.jks
consumer.ssl.truststore.password=test1234
关于通过 kafka-connect 连接 SSL 认证的 Kafka 集群配置说明可以参考:Configure Kafka Connect (opens in a new tab)
死信队列
默认情况下,转换过程中或转换过程中遇到的任何错误都会导致连接器失败。每个连接器配置还可以通过跳过它们来容忍此类错误,可选择将每个错误和失败操作的详细信息以及有问题的记录(具有不同级别的详细信息)写入死信队列以便记录。
errors.tolerance=all
errors.deadletterqueue.topic.name=test_error_topic
errors.deadletterqueue.context.headers.enable=true
errors.deadletterqueue.topic.replication.factor=1
配置项
Key | Default Value | Required | Description |
---|---|---|---|
name | - | Y | Connect 应用名称,必须是在 Kafka Connect 环境中唯一 |
connector.class | - | Y | org.apache.doris.kafka.connector.DorisSinkConnector |
topics | - | Y | 订阅的 topic 列表,逗号分隔: topic1,topic2 |
doris.urls | - | Y | SelectDB 的连接地址 |
doris.http.port | - | Y | SelectDB 的 HTTP 协议端口 |
doris.query.port | - | Y | SelectDB 的 MySQL 协议端口 |
doris.user | - | Y | SelectDB 用户名 |
doris.password | - | Y | SelectDB 密码 |
doris.database | - | Y | 要写入的数据库。多个库时可以为空,同时在 topic2table.map 需要配置具体的库名称 |
doris.topic2table.map | - | N | topic 和 table 表的对应关系,例:topic1:tb1,topic2:tb2 默认为空,表示 topic 和 table 名称一一对应。 多个库的格式为 topic1:db1.tbl1,topic2:db2.tbl2 |
buffer.count.records | 10000 | N | 在 flush 到 selectdb 之前,每个 Kafka 分区在内存中缓冲的记录数。默认 10000 条记录 |
buffer.flush.time | 120 | N | buffer 刷新间隔,单位秒,默认120秒 |
buffer.size.bytes | 5000000(5MB) | N | 每个 Kafka 分区在内存中缓冲的记录的累积大小,单位字节,默认5MB |
jmx | true | N | 通过 JMX 获取 Connector 内部监控指标, 请参考: Doris-Connector-JMX (opens in a new tab) |
enable.delete | false | N | 是否同步删除记录, 默认 false |
label.prefix | ${name} | N | 通过 Stream load 导入数据时的 label 前缀。默认为 Connector 应用名称 |
auto.redirect | true | N | 是否重定向 Stream load 请求。开启后可以直接通过 Stream load 将数据导入至 SelectDB 中 |
load.model | stream_load | N | 导入数据的方式。支持 stream_load 直接数据导入到 SelectDB 中;同时也支持 copy_into 的方式将数据先上传至对象存储中,然后将数据加载至 SelectDB 中 |
sink.properties.* | 'sink.properties.format':'json' , 'sink.properties.read_json_by_line':'true' | N | Stream load 的导入参数。 例如: 定义列分隔符 'sink.properties.column_separator':',' 详细参数参考这里 (opens in a new tab) |
delivery.guarantee | at_least_once | N | 消费 Kafka 数据导入至 SelectDB 时,数据一致性的保障方式。 支持 at_least_once exactly_once ,默认为 at_least_once 当前 SelectDB 只能保障使用 Copy into 导入的数据 exactly_once ,Stream load 导入数据的 exactly_once 也将在后续版本中支持 |
其他 Kafka Connect Sink 通用配置项可参考:connect_configuring (opens in a new tab)