Stream Load 导入
本文介绍如何通过 Stream Load 导入数据至 SelectDB Cloud 仓库中。
背景信息
Stream Load 是属于同步接口的导入方式,用户通过发送 HTTP 请求将本地文件或数据流导入到 SelectDB Cloud 仓库中。Stream Load 执行并返回导入结果,用户可直接通过请求的返回体判断本次导入是否成功。
Stream Load 主要适用于导入本地文件或通过程序导入数据流中的数据,支持的数据格式包括:CSV(文本)、JSON、PARQUET 和 ORC。
创建导入
Stream Load 通过 HTTP 协议提交和传输数据,这里通过 curl 命令展示如何提交导入。用户也可以通过其他 HTTP Client 进行操作。
语法
# Header 中支持的属性,请参见下面的参数说明。
# 格式为: -H "key1:value1"。
curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://host:port/api/{db}/{table}/_stream_load
参数说明
参数名称 | 参数说明 |
---|---|
--location-trusted | 当需要认证时,会将 user 和 password 传递给被重定向到的服务器。 |
-u | 用户名和密码。 |
-T | 需要导入的数据文件。 |
-XPUT | HTTP 请求的 Method,采用 PUT 请求方法。其中 host 为 SelectDB Cloud 仓库的私网地址或公网地址,port 为 HTTP 端口号。 |
由于 Stream Load 使用的是 HTTP 协议,所以导入任务有关的参数主要设置在 Header 中。常用的导入参数如下。
参数名称 | 参数说明 |
---|---|
label | 导入任务的唯一标识。label 是用户在导入命令中自定义的名称。通过这个 label,用户可以查看对应导入任务的执行情况。label 也可用于防止用户重复导入相同的数据,强烈推荐用户同一批次数据使用相同的 label。这样同一批次数据的重复请求只会被接受一次,保证了 At-Most-Once。当 label 对应的导入作业状态为 CANCELLED 时,该 label 可以再次被使用。 |
format | 指定导入数据格式,支持 CSV、JSON、PARQUET、ORC,默认值为 CSV。支持 csv_with_names(CSV 文件行首过滤)、csv_with_names_and_types(CSV 文件前两行过滤)。 |
line_delimiter | 用于指定导入文件中的换行符,默认为 \n。您可以使用做多个字符的组合作为换行符。 |
column_separator | 用于指定导入文件中的列分隔符,默认为 \t。您可以使用多个字符的组合作为列分隔符。如果是不可见字符,则需要加 \x 作为前缀,使用十六进制来表示分隔符。如Hive文件的分隔符 \x01,需要指定为 -H "column_separator:\x01"。 |
compress_type | 指定文件的压缩格式。目前只支持 CSV 文件的压缩,支持 gz、lzo、bz2、lz4、lzop、deflate 压缩格式。 |
max_filter_ratio | 导入任务的最大容忍率,默认为 0 容忍,取值范围是 0~1。当导入的错误率超过该值,则导入失败。如果用户希望忽略错误的行,可以通过设置这个参数大于 0,来保证导入可以成功。 |
cloud_cluster | 用于指定导入使用的集群。如果不指定,则使用用户的默认集群,如果用户没有设置默认集群,则自动为用户选择一个有权限的集群。 |
where | 导入任务指定的过滤条件。支持对原始数据指定 where 语句进行过滤,被过滤的数据将不会被导入,也不会参与 filter ratio 的计算,但会被计入 num_rows_unselected。 |
partitions | 待导入数据的 Partition 信息。如果待导入数据不属于指定的 Partition 则不会被导入。这些数据将计入 dpp.abnorm.ALL。 |
columns | 待导入数据的函数变换配置。支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。 |
merge_type | 数据合并类型,默认为 APPEND,表示本次导入是普通的追加写操作。MERGE 和 DELETE 类型仅适用于 Unique Key 表模型。其中 MERGE 类型需要配合 delete 参数使用,以标注 Delete Flag 列。而 DELETE 类型则表示本次导入的所有数据皆为删除数据。 |
delete | 仅在 MERGE 下有意义,表示数据的删除条件 function_column.sequence_col:只适用于 UNIQUE_KEYS,相同 key 列下,保证 value 列按照 source_sequence 列进行 REPLACE, source_sequence 可以是数据源中的列,也可以是表结构中的一列。 |
exec_mem_limit | 导入内存限制。默认为 2 GB,单位为字节。 |
timeout | 指定导入的超时时间,单位:秒,默认是 600 秒。可设置范围为 1~259200 秒。 |
timezone | 指定本次导入所使用的时区,默认为东八区。该参数会影响所有导入涉及的和时区有关的函数结果。 |
two_phase_commit | 是否开启两阶段事务提交模式,默认为 false。开启两阶段事务提交模式后,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为 PRECOMMITTED,用户手动触发 commit 操作之后,数据才可见。 |
由于 Stream Load 是一种同步的导入方式,所以导入的结果会通过创建导入的返回值直接返回给用户,返回结果样例如下。
{
"TxnId": 17,
"Label": "707717c0-271a-44c5-be0b-4e71bfeacaa5",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 5,
"NumberLoadedRows": 5,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 28,
"LoadTimeMs": 27,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 2,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 3,
"CommitAndPublishTimeMs": 18
}
返回结果参数说明如下。
参数名称 | 参数说明 |
---|---|
TxnId | 导入的事务 ID,用户可不感知。 |
Label | 导入 Label,由用户指定或系统自动生成。 |
Status | 导入状态。"Success" 表示导入成功。"Publish Timeout" 表示导入已经完成,只是数据可能会延迟可见,无需重试。"Label Already Exists" 表示 Label 重复,需更换 Label。"Fail" 表示导入失败。 |
ExistingJobStatus | 已存在的 Label 对应的导入作业的状态。这个字段只有在当 Status 为 "Label Already Exists" 时才会显示。用户可以通过这个状态,知晓已存在 Label 对应的导入作业的状态。"RUNNING" 表示作业还在执行。"FINISHED" 表示作业成功。 |
Message | 错误信息提示。 |
NumberTotalRows | 导入总处理的行数。 |
NumberLoadedRows | 成功导入的行数。 |
NumberFilteredRows | 数据质量不合格的行数。 |
NumberUnselectedRows | 被 where 条件过滤的行数。 |
LoadBytes | 导入的字节数。 |
LoadTimeMs | 导入完成时间,单位毫秒。 |
BeginTxnTimeMs | 向 FE 请求开始一个事务所花费的时间,单位:毫秒。 |
StreamLoadPutTimeMs | 向 FE 请求获取导入数据执行计划所花费的时间,单位:毫秒。 |
ReadDataTimeMs | 读取数据所花费的时间,单位:毫秒。 |
WriteDataTimeMs | 执行写入数据操作所花费的时间,单位:毫秒。 |
CommitAndPublishTimeMs | 向Fe请求提交并且发布事务所花费的时间,单位:毫秒。 |
ErrorURL | 如果有数据质量问题,通过访问这个 URL 查看具体错误行。 |
示例
curl --location-trusted -u admin:admin_123 -T data.csv -H "label:123" http://host:port/api/test_db/test_table/_stream_load
取消导入
用户无法手动取消 Stream Load,Stream Load 在超时或者导入错误后会被系统自动取消。
查看 Stream Load
您可以通过show stream load来查看已经完成的Stream load任务。默认BE(BackEnd)不保留Stream Load的启用记录,如果您要查看则需要在BE上启用记录,配置参数为:enable_stream_load_record=true,具体操作请参见BE配置项 (opens in a new tab)。
使用示例
-
将本地文件 test.data 中的数据导入到数据库 test_db 中的 test_table 表,使用 Label 用于去重,指定超时时间为 100 秒。
curl --location-trusted -u root -H "label:123" -H "timeout:100" -T test.data http://host:port/api/test_db/test_table/_stream_load
-
将本地文件 test.data 中的数据导入到数据库 test_db 中的 test_table 表,使用 label 用于去重,并且只导入 k1 等于 20180601 的数据。
curl --location-trusted -u root -H "label:123" -H "where: k1=20180601" -T test.data http://host:port/api/test_db/test_table/_stream_load
-
将本地文件 test.data 中的数据导入到数据库 test_db 中的 test_table 表,允许 20% 的错误率(用户是 defalut_cluster 中的)。
curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -T test.data http://host:port/api/test_db/test_table/_stream_load
-
将本地文件 test.data 中的数据导入到数据库 test_db 中的 test_table 表,允许 20% 的错误率,并且指定文件的列名。
curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -H "columns: k2, k1, v1" -T test.data http://host:port/api/test_db/test_table/_stream_load
-
将本地文件 test.data 中的数据导入到数据库 test_db 中的 test_table 表中的 p1、p2 分区,允许 20% 的错误率。
curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -H "partitions: p1, p2" -T test.data http://host:port/api/test_db/test_table/_stream_load
-
使用流式方式(streaming)导入。
seq 1 10 | awk '{OFS="\t"}{print $1, $1 * 10}' | curl --location-trusted -u root -T - http://host:port/api/test_db/test_table/_stream_load
-
导入含有 HLL 列的表,可以是表中的列或者数据中的列用于生成 HLL 列,也可使用 hll_empty 补充数据中没有的列。
curl --location-trusted -u root -H "columns: k1, k2, v1=hll_hash(k1), v2=hll_empty()" -T test.data http://host:port/api/test_db/test_table/_stream_load
-
导入数据进行严格模式过滤,并设置时区为 Africa/Abidjan。
curl --location-trusted -u root -H "strict_mode: true" -H "timezone: Africa/Abidjan" -T test.data http://host:port/api/test_db/test_table/_stream_load
-
删除与这批导入 key 相同的数据。
curl --location-trusted -u root -H "merge_type: DELETE" -T test.data http://host:port/api/test_db/test_table/_stream_load
-
将这批数据中与 flag 列为 1 的数据相匹配的列删除,其他行正常追加。
curl --location-trusted -u root: -H "column_separator:," -H "columns: siteid, citycode, username, pv, flag" -H "merge_type: MERGE" -H "delete: flag=1" -T test.data http://host:port/api/test_db/test_table/_stream_load
-
导入数据到含有 sequence 列的 UNIQUE_KEYS 表中。
curl --location-trusted -u root -H "columns: k1,k2,source_sequence,v1,v2" -H "function_column.sequence_col: source_sequence" -T test.data http://host:port/api/test_db/test_table/_stream_load
相关系统配置
FE 配置
stream_load_default_timeout_second
:导入任务的超时时间,单位:秒。默认的 timeout 时间为 600 秒,导入任务在设定的 timeout 时间内未完成则会被系统取消,变成 CANCELLED。如果导入的源文件无法在规定时间内完成导入,您可以在 Stream Load 请求中设置单独的超时时间或者调整 FE 的参数 stream_load_default_timeout_second
来设置全局的默认超时时间。
BE 配置
streaming_load_max_mb
:Stream Load 的最大导入大小,单位:MB,默认值为 10240 MB。如果您的原始文件超过该值,则需要调整 BE 参数 streaming_load_max_mb
。