SelectDB Cloud
SQL 手册
语句
Data-Manipulation-Statements
Load
BROKER-LOAD

BROKER-LOAD

Name

BROKER LOAD

Description

该命令主要用于通过 Broker 导入方式读取远端存储(如 HDFS、S3)上的数据导入到 SelectDB 表里。

LOAD LABEL load_label
(
data_desc1[, data_desc2, ...]
)
WITH broker_type
[broker_properties]
[load_properties];
  • load_label

    每个导入需要指定一个唯一的 Label。后续可以通过 SHOW LOAD 命令查看这个 label 对应作业的进度。

    [database.]label_name

  • data_desc1

    用于描述一组需要导入的文件。

    [MERGE|APPEND|DELETE]
    DATA INFILE
    (
    "file_path1"[, file_path2, ...]
    )
    [NEGATIVE]
    INTO TABLE `table_name`
    [PARTITION (p1, p2, ...)]
    [COLUMNS TERMINATED BY "column_separator"]
    [FORMAT AS "file_type"]
    [(column_list)]
    [COLUMNS FROM PATH AS (c1, c2, ...)]
    [PRECEDING FILTER predicate]
    [SET (column_mapping)]
    [WHERE predicate]
    [DELETE ON expr]
    [ORDER BY source_sequence]
    [PROPERTIES ("key1"="value1", ...)]
    • [MERGE|APPEND|DELETE]

      数据合并类型。默认为 APPEND,表示本次导入是普通的追加写操作。MERGE 和 DELETE 类型仅适用于 Unique Key 模型表。其中 MERGE 类型需要配合 [DELETE ON] 语句使用,以标注 Delete Flag 列。而 DELETE 类型则表示本次导入的所有数据皆为删除数据。

    • DATA INFILE

      指定需要导入的文件路径。可以是多个。可以使用通配符。路径最终必须匹配到文件,如果只匹配到目录则导入会失败。

    • NEGTIVE

      该关键词用于表示本次导入为一批”负“导入。这种方式仅针对具有整型 SUM 聚合类型的聚合数据表。该方式会将导入数据中,SUM 聚合列对应的整型数值取反。主要用于冲抵之前导入错误的数据。

    • PARTITION(p1, p2, ...)

      可以指定仅导入表的某些分区。不再分区范围内的数据将被忽略。

    • COLUMNS TERMINATED BY

      指定列分隔符。仅在 CSV 格式下有效。仅能指定单字节分隔符。

    • FORMAT AS

      指定文件类型,支持 CSV、PARQUET 和 ORC 格式。默认为 CSV。

    • column list

      用于指定原始文件中的列顺序。关于这部分详细介绍,可以参阅 列的映射,转换与过滤 (opens in a new tab) 文档。

      (k1, k2, tmpk1)

    • COLUMNS FROM PATH AS

      指定从导入文件路径中抽取的列。

    • PRECEDING FILTER predicate

      前置过滤条件。数据首先根据 column listCOLUMNS FROM PATH AS 按顺序拼接成原始数据行。然后按照前置过滤条件进行过滤。关于这部分详细介绍,可以参阅 列的映射,转换与过滤 (opens in a new tab) 文档。

    • SET (column_mapping)

      指定列的转换函数。

    • WHERE predicate

      根据条件对导入的数据进行过滤。关于这部分详细介绍,可以参阅 列的映射,转换与过滤 (opens in a new tab) 文档。

    • DELETE ON expr

      需配合 MEREGE 导入模式一起使用,仅针对 Unique Key 模型的表。用于指定导入数据中表示 Delete Flag 的列和计算关系。

    • ORDER BY

      仅针对 Unique Key 模型的表。用于指定导入数据中表示 Sequence Col 的列。主要用于导入时保证数据顺序。

    • PROPERTIES ("key1"="value1", ...)

      指定导入的 format 的一些参数。如导入的文件是json格式,则可以在这里指定json_rootjsonpathsfuzzy_parse等参数。

  • WITH broker_type

    指定需要使用的 Broker 类型,支持HDFSS3 两种。S3 类型的 Broker Load 也称为 S3 Load,可参考 S3 Load (opens in a new tab) 文档。

  • broker_properties

    指定 broker 所需的信息。这些信息通常被用于 Broker 能够访问远端存储系统。如 BOS 或 HDFS。

    (
        "key1" = "val1",
        "key2" = "val2",
        ...
    )
    • load_properties

      指定导入的相关参数。目前支持以下参数:

      • timeout

        导入超时时间。默认为 4 小时。单位秒。

      • max_filter_ratio

        最大容忍可过滤(数据不规范等原因)的数据比例。默认零容忍。取值范围为 0 到 1。

      • exec_mem_limit

        导入内存限制。默认为 2GB。单位为字节。

      • strict_mode

        是否对数据进行严格限制。默认为 false。

      • timezone

        指定某些受时区影响的函数的时区,如 strftime/alignment_timestamp/from_unixtime 等等,具体请查阅 时区 (opens in a new tab) 文档。如果不指定,则使用 "Asia/Shanghai" 时区

      • load_parallelism

        导入并发度,默认为 1。调大导入并发度会启动多个执行计划同时执行导入任务,加快导入速度。

      • send_batch_parallelism

        用于设置发送批处理数据的并行度,如果并行度的值超过 BE 配置中的 max_send_batch_parallelism_per_job,那么作为协调点的 BE 将使用 max_send_batch_parallelism_per_job 的值。

      • load_to_single_tablet

        布尔类型,为 true 表示支持一个任务只导入数据到对应分区的一个 tablet,默认值为 false,作业的任务数取决于整体并发度。该参数只允许在对带有 random 分区的 olap 表导数的时候设置。

Example

  1. 从 HDFS 导入一批数据。

    LOAD LABEL example_db.label1
    (
        DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file.txt")
        INTO TABLE `my_table`
        COLUMNS TERMINATED BY ","
    )
    WITH HDFS
    (
        "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
    );

    导入文件 file.txt,按逗号分隔,导入到表 my_table。当从 HDFS 导入数据时,broker_properties 中需要指定 fs.defaultFS 属性。

  2. 从 HDFS 导入数据,使用通配符匹配两批两批文件,分别导入到两个表中。

    LOAD LABEL example_db.label2
    (
        DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file-10*")
        INTO TABLE `my_table1`
        PARTITION (p1)
        COLUMNS TERMINATED BY ","
        (k1, tmp_k2, tmp_k3)
        SET (
            k2 = tmp_k2 + 1,
            k3 = tmp_k3 + 1
        )
        DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file-20*")
        INTO TABLE `my_table2`
        COLUMNS TERMINATED BY ","
        (k1, k2, k3)
    )
    WITH HDFS
    (
        "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
    );

    使用通配符匹配导入两批文件 file-10*file-20*。分别导入到 my_table1my_table2 两张表中。其中 my_table1 指定导入到分区 p1 中,并且将导入源文件中第二列和第三列的值 +1 后导入。

  3. 从 HDFS 导入一批数据。

    LOAD LABEL example_db.label3
    (
        DATA INFILE("hdfs://my_ha/user/doris/data/*/*")
        INTO TABLE `my_table`
        COLUMNS TERMINATED BY "\\x01"
    )
    WITH HDFS
    (
        "hadoop.username" = "hive",
        "fs.defaultFS" = "hdfs://my_ha",
        "dfs.nameservices" = "my_ha",
        "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
        "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
        "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
        "dfs.client.failover.proxy.provider.my_ha" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    );

    指定分隔符为 Hive 的默认分隔符 \\x01,并使用通配符 * 指定 data 目录下所有目录的所有文件。使用简单认证,同时配置 namenode HA。

  4. 导入 Parquet 格式数据,指定 FORMAT 为 parquet。默认是通过文件后缀判断。

    LOAD LABEL example_db.label4
    (
        DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file")
        INTO TABLE `my_table`
        FORMAT AS "parquet"
        (k1, k2, k3)
    )
    WITH HDFS
    (
        "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
    );
  5. 导入数据,并提取文件路径中的分区字段。

    LOAD LABEL example_db.label10
    (
        DATA INFILE("hdfs://hdfs_host:hdfs_port/input/city=beijing/*/*")
        INTO TABLE `my_table`
        FORMAT AS "csv"
        (k1, k2, k3)
        COLUMNS FROM PATH AS (city, utc_date)
    )
    WITH HDFS
    (
        "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
    );

    my_table 表中的列为 k1, k2, k3, city, utc_date

    其中 hdfs://hdfs_host:hdfs_port/user/doris/data/input/dir/city=beijing 目录下包括如下文件:

    hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-01/0000.csv
    hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-02/0000.csv
    hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-03/0000.csv
    hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-04/0000.csv

    文件中只包含 k1, k2, k3 三列数据,city, utc_date 这两列数据会从文件路径中提取。

  6. 对待导入数据进行过滤。

    LOAD LABEL example_db.label6
    (
        DATA INFILE("hdfs://host:port/input/file")
        INTO TABLE `my_table`
        (k1, k2, k3)
        PRECEDING FILTER k1 = 1
        SET (
            k2 = k2 + 1
        )
        WHERE k1 > k2
    )
    WITH HDFS
    (
        "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
    );

    只有原始数据中,k1 = 1,并且转换后,k1 > k2 的行才会被导入。

  7. 导入数据,提取文件路径中的时间分区字段,并且时间包含 %3A (在 hdfs 路径中,不允许有 ':',所有 ':' 会由 %3A 替换)。

    LOAD LABEL example_db.label7
    (
        DATA INFILE("hdfs://host:port/user/data/*/test.txt")
        INTO TABLE `tbl12`
        COLUMNS TERMINATED BY ","
        (k2,k3)
        COLUMNS FROM PATH AS (data_time)
        SET (
            data_time=str_to_date(data_time, '%Y-%m-%d %H%%3A%i%%3A%s')
        )
    )
    WITH HDFS
    (
        "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
    );

    路径下有如下文件:

    /user/data/data_time=2020-02-17 00%3A00%3A00/test.txt
    /user/data/data_time=2020-02-18 00%3A00%3A00/test.txt

    表结构为:

    data_time DATETIME,
    k2        INT,
    k3        INT
  8. 从 HDFS 导入一批数据,指定超时时间和过滤比例。并且将原有数据中与 导入数据中 v2 大于 100 的列相匹配的列删除,其他列正常导入。

    LOAD LABEL example_db.label8
    (
        MERGE DATA INFILE("HDFS://test:802/input/file")
        INTO TABLE `my_table`
        (k1, k2, k3, v2, v1)
        DELETE ON v2 > 100
    )
    WITH HDFS
    (
        "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
    )
    PROPERTIES
    (
        "timeout" = "3600",
        "max_filter_ratio" = "0.1"
    );

    使用 MERGE 方式导入。my_table 必须是一张 Unique Key 的表。当导入数据中的 v2 列的值大于 100 时,该行会被认为是一个删除行。

    导入任务的超时时间是 3600 秒,并且允许错误率在 10% 以内。

  9. 导入时指定 source_sequence 列,保证 UNIQUE_KEYS 表中的替换顺序。

    LOAD LABEL example_db.label9
    (
        DATA INFILE("HDFS://test:802/input/file")
        INTO TABLE `my_table`
        COLUMNS TERMINATED BY ","
        (k1,k2,source_sequence,v1,v2)
        ORDER BY source_sequence
    )
    WITH HDFS
    (
        "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
    )

    my_table 必须是 Unqiue Key 模型表,并且指定了 Sequcence Col。数据会按照源数据中 source_sequence 列的值来保证顺序性。

  10. 从 HDFS 导入一批数据,指定文件格式为 json 并指定 json_rootjsonpaths

    LOAD LABEL example_db.label10
    (
        DATA INFILE("HDFS://test:port/input/file.json")
        INTO TABLE `my_table`
        FORMAT AS "json"
        PROPERTIES(
          "json_root" = "$.item",
          "jsonpaths" = "[$.id, $.city, $.code]"
        )
    )
    with HDFS (
        "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
    )
    PROPERTIES
    (
        "timeout"="1200",
        "max_filter_ratio"="0.1"
    );

    jsonpaths 可与 column listSET (column_mapping)配合:

    LOAD LABEL example_db.label10
    (
        DATA INFILE("HDFS://test:port/input/file.json")
        INTO TABLE `my_table`
        FORMAT AS "json"
        (id, code, city)
        SET (id = id * 10)
        PROPERTIES(
          "json_root" = "$.item",
          "jsonpaths" = "[$.id, $.code, $.city]"
        )
    )
    with HDFS (
        "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
    )
    PROPERTIES
    (
        "timeout"="1200",
        "max_filter_ratio"="0.1"
    );

Keywords

BROKER, LOAD

Best Practice

  1. 查看导入任务状态

    Broker Load 是一个异步导入过程,语句执行成功仅代表导入任务提交成功,并不代表数据导入成功。导入状态需要通过 SHOW LOAD 命令查看。

  2. 取消导入任务

    已提交切尚未结束的导入任务可以通过 CANCEL LOAD 命令取消。取消后,已写入的数据也会回滚,不会生效。

  3. Label、导入事务、多表原子性

    Doris 中所有导入任务都是原子生效的。并且在同一个导入任务中对多张表的导入也能够保证原子性。同时,Doris 还可以通过 Label 的机制来保证数据导入的不丢不重。具体说明可以参阅 导入事务和原子性 (opens in a new tab) 文档。

  4. 列映射、衍生列和过滤

    Doris 可以在导入语句中支持非常丰富的列转换和过滤操作。支持绝大多数内置函数和 UDF。关于如何正确的使用这个功能,可参阅 列的映射,转换与过滤 (opens in a new tab) 文档。

  5. 错误数据过滤

    Doris 的导入任务可以容忍一部分格式错误的数据。容忍了通过 max_filter_ratio 设置。默认为 0,即表示当有一条错误数据时,整个导入任务将会失败。如果用户希望忽略部分有问题的数据行,可以将次参数设置为 0~1 之间的数值,Doris 会自动跳过哪些数据格式不正确的行。

    关于容忍率的一些计算方式,可以参阅 列的映射,转换与过滤 (opens in a new tab) 文档。

  6. 严格模式

    strict_mode 属性用于设置导入任务是否运行在严格模式下。该格式会对列映射、转换和过滤的结果产生影响。关于严格模式的具体说明,可参阅 严格模式 (opens in a new tab) 文档。

  7. 超时时间

    Broker Load 的默认超时时间为 4 小时。从任务提交开始算起。如果在超时时间内没有完成,则任务会失败。

  8. 数据量和任务数限制

    Broker Load 适合在一个导入任务中导入 100GB 以内的数据。虽然理论上在一个导入任务中导入的数据量没有上限。但是提交过大的导入会导致运行时间较长,并且失败后重试的代价也会增加。

    同时受限于集群规模,我们限制了导入的最大数据量为 ComputeNode 节点数 * 3GB。以保证系统资源的合理利用。如果有大数据量需要导入,建议分成多个导入任务提交。

    Doris 同时会限制集群内同时运行的导入任务数量,通常在 3-10 个不等。之后提交的导入作业会排队等待。队列最大长度为 100。之后的提交会直接拒绝。注意排队时间也被计算到了作业总时间中。如果超时,则作业会被取消。所以建议通过监控作业运行状态来合理控制作业提交频率。

© 2023 北京飞轮数据科技有限公司 京ICP备2022004029号 | Apache、Apache Doris 以及相关开源项目名称均为 Apache 基金会商标