引言
在大数据生态系统中,数据仓库和实时分析平台扮演着至关重要的角色。随着数据量的爆炸式增长和企业对实时数据分析需求的不断提升,如何高效地在不同的数据存储和处理系统之间迁移数据成为了一个关键问题。Apache Hive
作为数据仓库基础设施,提供了类SQL的查询能力,已成为大数据处理的标准工具之一;而Apache Doris
(原Palo)则是一个现代化的MPP分析型数据库,以其高性能、低延迟和易用性著称,在实时分析场景中表现出色。本文将详细介绍如何将Hive中的数据导入到Doris中,包括使用Catalog
方式和Broker Load
方式,并深入分析各自的优缺点、适用场景以及在实际应用中可能遇到的问题和解决方案。通过本文的指导,读者将能够根据自身业务需求选择最合适的数据导入方案,并成功实现Hive与Doris之间的数据流转。
Hive与Doris简介
Apache Hive
Apache Hive是基于Hadoop构建的数据仓库工具,最初由Facebook开发并于2010年成为Apache顶级项目。它提供了数据汇总、查询和分析功能,将结构化的数据文件映射为一张数据库表,并提供了类SQL的查询语言HiveQL(HQL)。Hive将SQL查询转换为MapReduce、Tez或Spark作业,允许熟悉SQL的分析师查询存储在Hadoop分布式文件系统(HDFS)中的大型数据集,无需编写复杂的MapReduce程序。
Hive的主要特点包括:
提供类SQL查询语言(HQL):HiveQL与标准SQL高度兼容,大大降低了数据分析师的学习成本,使得熟悉SQL的专业人员能够快速上手大数据分析工作。
支持多种数据格式(如TextFile, SequenceFile, ORC, Parquet等):Hive支持多种存储格式,其中ORC和Parquet等列式存储格式能够显著提高查询性能,减少存储空间占用,特别适合分析型工作负载。
可扩展的架构设计:Hive采用插件式架构,用户可以通过自定义SerDe(序列化/反序列化)、UDF(用户定义函数)和UDAF(用户定义聚合函数)来扩展其功能,满足特定的业务需求。
支持用户自定义函数(UDF)和聚合函数(UDAF):Hive允许开发者使用Java、Python等语言编写自定义函数,实现复杂的数据转换和计算逻辑,扩展了Hive的表达能力。
强大的分区和分桶功能:通过分区和分桶,Hive可以显著提高查询性能,特别是对于时间序列数据和大规模数据集的查询场景。
完善的元数据管理:Hive使用Metastore存储表结构、分区信息等元数据,支持多种元数据存储后端,如MySQL、PostgreSQL等。
Apache Doris
Apache Doris(原Palo)是一个现代化的MPP分析型数据库,由百度开源并于2020年捐赠给Apache软件基金会。它专为实时分析而设计,能够在亚秒级内响应复杂的分析查询,特别适合商业智能、实时报表和用户行为分析等场景。
Doris具有以下特点:
高性能的列式存储和向量化执行引擎:Doris采用列式存储格式,能够有效减少I/O开销,提高查询效率。其向量化执行引擎可以一次性处理多行数据,充分利用CPU缓存,显著提升查询性能。
支持实时数据导入和查询:Doris支持多种实时数据导入方式,如Broker Load、Stream Load等,能够实现数据的近实时导入,同时保证查询性能不受影响。
灵活的数据分区和分桶策略:Doris支持多级分区和哈希分桶,用户可以根据业务特点设计最优的数据分布方式,提高查询并发能力。
支持多种数据导入方式:除了Broker Load,Doris还支持Stream Load、Routine Load、Insert Into等多种导入方式,满足不同场景的数据导入需求。
良好的兼容性,支持MySQL协议和标准SQL:Doris兼容MySQL协议,可以使用MySQL客户端或BI工具直接连接,同时支持大部分标准SQL语法,降低了使用门槛。
强大的聚合能力和预计算能力:Doris支持多种聚合模型,如Duplicate Key、Aggregation Key、Unique Key等,能够实现预计算,加速复杂查询。
完善的物化视图功能:Doris支持物化视图,可以预先计算并存储常用查询的结果,进一步提高查询性能。
高可用性和可扩展性:Doris采用Master-Worker架构,支持多Master节点,避免了单点故障问题。同时,Doris支持水平扩展,可以通过增加节点来提升系统处理能力。
使用Catalog方式导入Hive数据到Doris
Catalog简介
Catalog是Doris中用于管理外部数据源元数据的组件,它允许Doris直接访问和查询外部数据源(如Hive、MySQL、Elasticsearch等)中的数据,而无需将数据实际导入到Doris中。这种方式实现了数据的虚拟化,使得用户可以在不移动数据的情况下,跨多个数据源进行联合查询和分析。
Catalog方式的工作原理是:Doris通过Catalog组件连接到外部数据源的元数据服务(如Hive Metastore),获取表结构、分区信息等元数据,然后在查询时直接从外部存储系统(如HDFS)读取数据。这种方式特别适合数据量较大且需要实时查询的场景,因为它避免了数据复制带来的额外存储成本和数据一致性问题。
使用Catalog方式的主要优势包括:
数据实时性:由于数据存储在原始位置,查询结果总是反映最新的数据状态。
存储效率:无需复制数据,节省了存储空间,特别是在处理大规模数据集时优势明显。
数据一致性:避免了数据同步过程中可能出现的一致性问题。
简化架构:减少了ETL流程,简化了数据架构。
联合查询能力:可以同时查询多个外部数据源的数据,实现跨源数据分析。
使用Catalog导入Hive数据的详细步骤
1. 准备工作
在开始使用Catalog方式导入Hive数据之前,需要确保以下条件满足:
确保Doris和Hive集群正常运行:检查Doris FE和BE节点、Hive Metastore和Hadoop集群的状态,确保所有服务正常运行且网络连通。
确保Doris节点可以访问Hive Metastore:验证Doris节点能够访问Hive Metastore服务(默认端口9083),可以使用telnet或nc命令测试连通性:
telnet hive-metastore-host 9083
确保Doris节点有权限访问HDFS上的数据文件:验证Doris运行用户(通常是doris用户)有足够的权限访问HDFS上的数据文件。可以尝试使用hadoop命令测试:
hadoop fs -ls /user/hive/warehouse/your_database/your_table/
检查网络配置:确保Doris节点与Hive Metastore、Hadoop NameNode之间的网络配置正确,防火墙规则允许必要的端口通信。
确认认证方式:根据集群的认证方式(简单认证或Kerberos认证),准备相应的配置。
2. 创建Hive Catalog
在Doris中创建Hive Catalog,指定连接Hive Metastore的相关参数:
-- 基本认证方式
CREATE CATALOG hive_catalog PROPERTIES (
"type" = "hive",
"hive.metastore.uris" = "thrift://hive-metastore:9083"
);
-- Kerberos认证方式
CREATE CATALOG hive_catalog PROPERTIES (
"type" = "hive",
"hive.metastore.uris" = "thrift://hive-metastore:9083",
"hive.metastore.sasl.enabled" = "true",
"hive.metastore.kerberos.principal" = "hive/_HOST@YOUR-REALM",
"hive.metastore.kerberos.keytab" = "/path/to/hive.keytab"
);
-- 高级配置选项
CREATE CATALOG hive_catalog PROPERTIES (
"type" = "hive",
"hive.metastore.uris" = "thrift://hive-metastore:9083",
"hive.metastore.sasl.enabled" = "true",
"hive.metastore.kerberos.principal" = "hive/_HOST@YOUR-REALM",
"hive.metastore.kerberos.keytab" = "/path/to/hive.keytab",
"hive.metastore.execute.setugi" = "true",
"hive.security.authorization.enabled" = "false",
"hive.metastore.token.signature.file" = "/path/to/token-signature"
);
创建Catalog后,可以使用以下命令验证Catalog是否创建成功:
SHOW CATALOGS;
SHOW CREATE CATALOG hive_catalog;
3. 创建外部数据库映射
创建指向Hive中特定数据库的外部数据库映射:
-- 基本映射
CREATE DATABASE hive_db PROPERTIES (
"catalog" = "hive_catalog",
"database" = "your_hive_database"
);
-- 带有过滤条件的映射
CREATE DATABASE hive_db_filtered PROPERTIES (
"catalog" = "hive_catalog",
"database" = "your_hive_database",
"table.filter" = "your_table_prefix_*"
);
-- 映射多个Hive数据库
CREATE DATABASE hive_db_all PROPERTIES (
"catalog" = "hive_catalog",
"database" = "*"
);
创建数据库映射后,可以使用以下命令验证:
SHOW DATABASES;
SHOW CREATE DATABASE hive_db;
4. 创建外部表
创建指向Hive中特定表的外部表,定义表结构和属性:
-- 基本外部表创建
CREATE EXTERNAL TABLE doris_table (
id INT,
name VARCHAR(100),
age INT,
email VARCHAR(100),
create_time DATETIME
) ENGINE=HIVE
PROPERTIES (
"table" = "your_hive_table",
"database" = "your_hive_database"
);
-- 带有列映射的外部表
CREATE EXTERNAL TABLE doris_table_mapped (
user_id INT COMMENT '用户ID',
user_name VARCHAR(100) COMMENT '用户名',
user_age INT COMMENT '用户年龄',
contact_email VARCHAR(100) COMMENT '联系邮箱',
registration_time DATETIME COMMENT '注册时间'
) ENGINE=HIVE
PROPERTIES (
"table" = "your_hive_table",
"database" = "your_hive_database",
"table-type" = "managed_table", -- 或 "external_table"
"file.format" = "ORC" -- 指定文件格式
);
-- 分区表的外部表创建
CREATE EXTERNAL TABLE doris_partitioned_table (
id INT,
name VARCHAR(100),
amount DECIMAL(10,2),
event_date DATE
) ENGINE=HIVE
PARTITION BY COLUMNS (event_date)
PROPERTIES (
"table" = "your_hive_partitioned_table",
"database" = "your_hive_database",
"partition.desc" = "event_date"
);
-- 带有过滤条件的外部表
CREATE EXTERNAL TABLE doris_filtered_table (
id INT,
name VARCHAR(100),
age INT,
status VARCHAR(20)
) ENGINE=HIVE
PROPERTIES (
"table" = "your_hive_table",
"database" = "your_hive_database",
"predicates" = "status = 'active' AND age > 18"
);
创建外部表后,可以使用以下命令验证:
SHOW TABLES FROM hive_db;
SHOW CREATE TABLE doris_table;
DESCRIBE doris_table;
5. 查询数据
创建外部表后,可以直接使用SQL查询Hive中的数据,就像查询普通的Doris表一样:
-- 基本查询
SELECT * FROM doris_table LIMIT 100;
-- 条件查询
SELECT id, name, age FROM doris_table WHERE age > 30 AND status = 'active';
-- 聚合查询
SELECT COUNT(*) AS total_users, AVG(age) AS avg_age FROM doris_table;
-- 分组查询
SELECT status, COUNT(*) AS user_count FROM doris_table GROUP BY status;
-- 排序查询
SELECT * FROM doris_table ORDER BY age DESC LIMIT 10;
-- 连接查询
SELECT a.id, a.name, b.order_count
FROM doris_table a
JOIN doris_order_table b ON a.id = b.user_id;
-- 子查询
SELECT * FROM doris_table WHERE id IN (SELECT user_id FROM doris_order_table WHERE amount > 1000);
-- 分区裁剪
SELECT * FROM doris_partitioned_table WHERE event_date BETWEEN '2023-01-01' AND '2023-01-31';
使用Catalog方式可能遇到的问题
1. 权限问题
问题描述:
Doris节点可能没有足够的权限访问Hive Metastore或HDFS
在Kerberos环境中,认证失败导致无法访问Hive资源
Hive表权限设置导致Doris无法读取数据
解决方案:
确保Doris运行用户有适当的HDFS权限:
# 检查HDFS权限 hadoop fs -ls /user/hive/warehouse/ # 修改HDFS权限 hadoop fs -chown -R doris:hadoop /user/hive/warehouse/your_database/your_table/ hadoop fs -chmod -R 755 /user/hive/warehouse/your_database/your_table/
正确配置Kerberos认证(如果使用):
CREATE CATALOG hive_catalog PROPERTIES ( "type" = "hive", "hive.metastore.uris" = "thrift://hive-metastore:9083", "hive.metastore.sasl.enabled" = "true", "hive.metastore.kerberos.principal" = "hive/_HOST@YOUR-REALM", "hive.metastore.kerberos.keytab" = "/path/to/hive.keytab", "hive.metastore.execute.setugi" = "true" );
在Hive中为Doris用户授予适当的表权限:
-- 在Hive中执行 GRANT SELECT ON TABLE your_database.your_table TO USER 'doris_user'; -- 或者使用角色 CREATE ROLE doris_reader_role; GRANT SELECT ON DATABASE your_database TO ROLE doris_reader_role; GRANT ROLE doris_reader_role TO USER 'doris_user';
配置Hive的授权管理器:
-- 在Hive配置中设置 set hive.security.authorization.manager=org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory; set hive.security.authorization.enabled=true; set hive.security.metastore.authorization.authenticator.manager=org.apache.hadoop.hive.ql.security.HiveDefaultAuthenticator;
2. 性能问题
问题描述:
直接查询外部数据可能导致性能下降,特别是对于复杂查询或大数据集
网络延迟影响查询响应时间
外部存储系统的性能瓶颈
解决方案:
对于频繁查询的数据,考虑导入到Doris中:
-- 创建目标表 CREATE TABLE doris_local_table AS SELECT * FROM doris_external_table; -- 或者使用INSERT INTO INSERT INTO doris_local_table SELECT * FROM doris_external_table;
优化查询语句,减少数据扫描量:
-- 只查询需要的列 SELECT id, name FROM doris_external_table WHERE age > 30; -- 使用分区裁剪 SELECT * FROM doris_partitioned_table WHERE event_date = '2023-01-01'; -- 使用适当的索引 CREATE INDEX idx_age ON doris_external_table(age);
调整Doris配置参数:
-- 设置适当的查询超时时间 SET query_timeout = 300; -- 增加查询内存限制 -- 8GB SET exec_mem_limit = 8589934592; -- 8GB -- 启用向量化执行 SET enable_vectorized_engine = true;
使用Doris的物化视图加速查询:
CREATE MATERIALIZED VIEW mv_user_stats BUILD IMMEDIATE REFRESH COMPLETE ON DEMAND AS SELECT status, COUNT(*) AS user_count, AVG(age) AS avg_age FROM doris_external_table GROUP BY status;
3. 元数据同步问题
问题描述:
Hive表结构变更后,Doris中的元数据不会自动更新
分区信息不同步导致查询失败
表属性变更未反映到Doris外部表
解决方案:
手动刷新元数据:
-- 刷新整个数据库的元数据 REFRESH DATABASE hive_db; -- 刷新特定表的元数据 REFRESH TABLE doris_external_table; -- 刷新分区元数据 REFRESH PARTITION p1 IN TABLE doris_partitioned_table;
使用定时任务定期同步元数据:
# 创建cron任务,每天凌晨2点刷新元数据 0 2 * * * /opt/doris/bin/refresh_metadata.sh
在Doris外部表创建时设置自动刷新
EATE EXTERNAL TABLE doris_auto_refresh_table ( id INT, name VARCHAR(100), age INT ) ENGINE=HIVE PROPERTIES ( "table" = "your_hive_table", "database" = "your_hive_database", "auto.refresh" = "true", "auto.refresh.interval" = "3600" -- 每小时刷新一
监控元数据变更并触发刷新:
- 创建事件监控Hive表变更 CREATE EVENT monitor_hive_table_change ON SCHEDULE EVERY 1 HOUR DO CALL refresh_hive_table_metadata('your_database.your_table')
刷新整个catalog:
refresh catalog hive
4. 数据格式兼容性问题
问题描述:
某些Hive特有的数据类型或格式在Doris中可能不支持
数据类型映射不正确导致查询结果异常
复杂的数据结构(如数组、映射)处理困难
解决方案:
在创建外部表时进行适当的类型映射:
-- Hive数据类型到Doris数据类型的映射 CREATE EXTERNAL TABLE doris_type_mapped_table ( tiny_col TINYINT, -- Hive: tinyint small_col SMALLINT, -- Hive: smallint int_col INT, -- Hive: int bigint_col BIGINT, -- Hive: bigint boolean_col BOOLEAN, -- Hive: boolean float_col FLOAT, -- Hive: float double_col DOUBLE, -- Hive: double string_col VARCHAR(1000), -- Hive: string varchar_col VARCHAR(100), -- Hive: varchar char_col CHAR(10), -- Hive: char date_col DATE, -- Hive: date datetime_col DATETIME, -- Hive: timestamp decimal_col DECIMAL(10,2) -- Hive: decimal ) ENGINE=HIVE PROPERTIES ( "table" = "your_hive_table", "database" = "your_hive_database" );
处理复杂类型:
-- 对于Hive的数组类型,可以使用JSON字符串表示 CREATE EXTERNAL TABLE doris_array_table ( id INT, tags VARCHAR(1000) -- 存储JSON格式的数组 ) ENGINE=HIVE PROPERTIES ( "table" = "your_hive_array_table", "database" = "your_hive_database" ); -- 查询时解析JSON SELECT id, JSON_ARRAY(tags) AS tag_array, JSON_LENGTH(tags) AS tag_count FROM doris_array_table;
处理NULL值和默认值:
CREATE EXTERNAL TABLE doris_null_handling_table ( id INT, name VARCHAR(100), age INT DEFAULT 0, status VARCHAR(20) DEFAULT 'unknown' ) ENGINE=HIVE PROPERTIES ( "table" = "your_hive_table", "database" = "your_hive_database", "null.if" = "\\N", -- 指定NULL值的表示方式 "default.value" = "unknown" -- 指定默认值 );
使用视图简化复杂查询:
CREATE VIEW doris_complex_view AS SELECT id, name, CASE WHEN age < 18 THEN 'minor' WHEN age BETWEEN 18 AND 65 THEN 'adult' ELSE 'senior' END AS age_group, CONCAT(first_name, ' ', last_name) AS full_name FROM doris_external_table;
Broker Load方式导入Hive数据到Doris
为什么需要Broker Load
当使用Catalog方式导入大量数据时,可能会遇到内存不足的问题。这是因为Catalog方式需要在内存中处理整个数据集,对于TB级别的数据集,这会导致内存溢出或查询性能急剧下降。此外,Catalog方式依赖于外部存储系统的性能,如果HDFS或对象存储系统出现性能瓶颈,也会影响查询效率。
Broker Load是Doris提供的一种异步导入方式,它通过Broker进程将数据从HDFS导入到Doris,避免了内存限制问题。Broker Load将数据导入过程分解为多个阶段,包括数据读取、转换、加载等,每个阶段都可以独立优化和扩展。这种方式特别适合大数据量的批量导入场景,能够充分利用集群资源,实现高效的数据迁移。
使用Broker Load的主要优势包括:
异步导入,不阻塞查询:Broker Load是异步执行的,不会影响Doris集群的正常查询操作,用户可以继续执行查询任务。
支持大数据量导入:Broker Load能够处理TB级别的数据集,通过并行读取和加载,充分利用集群资源。
支持多种数据格式:Broker Load支持ORC、Parquet、CSV等多种数据格式,满足不同场景的数据导入需求。
支持导入过程中的错误处理和重试:Broker Load提供了完善的错误处理机制,能够自动重试失败的导入任务,提高数据导入的可靠性。
支持数据转换和过滤:在导入过程中,可以进行数据清洗、转换和过滤,减少存储空间占用,提高数据质量。
支持增量导入:通过指定导入路径或时间范围,可以实现增量导入,只导入新增或变更的数据。
Broker的安装部署配置
1. 下载Broker包
从Doris官方仓库下载Broker包,根据Doris版本选择对应的Broker版本:
# 下载最新稳定版(doris2 之后的版本会自带Broker进程)
wget https://github.com/apache/doris/releases/download/3.0.8/apache-doris-3.0.8-bin.tar.gz
# 或者下载特定版本(也可以从下载的Doris程序包里面能找到apache_hdfs_broker 或者其他名称)
wget https://github.com/apache/doris/releases/download/1.2.4/apache-doris-1.2.4-incubating-bin.tar.gz
# 验证下载文件的完整性
sha256sum apache-doris-1.2.4-bin.tar.gz
2. 解压并配置Broker
如果不想这么麻烦的配置,也可以直接跳过 2、3、4步骤,直接进行第五步(前提是启动Broker服务),然后在执行
LOAD LABEL ...
的时候配置部分你想要的配置,1.2的官方文档 Apache Doris - 归档文档中心 - Apache Doris, Apache Doris V1.2 中文手册
# 创建目录
mkdir -p /opt/doris/broker
cd /opt/doris/broker
# 解压
tar -xzf /path/to/apache-doris-2.0.8-bin.tar.gz
# 进入配置目录
cd apache-doris-2.0.8-bin/fe/conf
# 复制配置模板
cp broker.conf.template broker.conf
# 编辑配置文件
vi broker.conf
3. 修改broker.conf配置
根据实际环境修改broker.conf配置文件:
# Broker服务端口
broker_port = 8000
# Broker工作目录,用于存储临时文件
broker_work_dir = /opt/doris/broker/work
# 日志目录
broker_log_dir = /opt/doris/broker/logs
# Broker进程数,通常设置为CPU核心数
broker_num = 4
# JVM内存配置,根据数据量调整
broker_memory_limit = 4g
# HDFS配置
broker.hadoop.security.authentication = simple
# 如果使用Kerberos认证
broker.hadoop.kerberos.principal = ""
broker.hadoop.kerberos.keytab = ""
# HDFS NameNode地址
broker.hadoop.hdfs.namenode = hdfs://namenode:8020
# HDFS用户
broker.hadoop.hdfs.user = doris
# S3配置(如果使用)
broker.s3.access_key = ""
broker.s3.secret_key = ""
broker.s3.region = ""
broker.s3.bucket = ""
# 并行读取线程数
broker.read_parallel = 4
# 压缩配置
broker.compression.codec = lz4
broker.compression.level = 1
# 超时配置(秒)
broker.timeout = 3600
4. 启动Broker服务
# 进入Broker lib目录
cd /opt/doris/broker/apache-doris-2.0.8-bin/be/lib
# 启动Broker服务
nohup java -Xmx4g -Xms4g -XX:MaxDirectMemorySize=4g -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=4 -XX:ConcGCThreads=4 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/doris/broker/logs/java.hprof -jar broker.jar --config_file=/opt/doris/broker/apache-doris-2.0.8-bin/fe/conf/broker.conf > /opt/doris/broker/logs/broker.log 2>&1 &
# 检查Broker进程
ps aux | grep broker
# 检查Broker日志
tail -f /opt/doris/broker/logs/broker.log
5. 在Doris中注册Broker
-- 添加 Broker节点
ALTER SYSTEM ADD BROKER broker_name "broker_host1:8000,broker_host2:8000,broker_host3:8000";
-- 查看Broker状态
SHOW BROKER;
-- 测试Broker连接
SHOW PROC '/brokers';
使用Broker Load导入Hive数据的步骤
1. 准备数据
确保HDFS上有需要导入的数据文件,并检查文件格式和内容:
# 上传数据到HDFS
hadoop fs -put /path/to/local/data.csv /user/hive/warehouse/your_database/your_table/
# 检查文件
hadoop fs -ls /user/hive/warehouse/your_database/your_table/
# 查看文件内容
hadoop fs -cat /user/hive/warehouse/your_database/your_table/data.csv | head -10
# 检查文件格式
file -b /path/to/local/data.csv
对于ORC或Parquet格式的文件,可以使用专门的工具进行检查:
# 使用ORC工具检查ORC文件
java -jar orc-tools-1.6.7.jar -d /user/hive/warehouse/your_database/your_table/data.orc
# 使用Parquet工具检查Parquet文件
parquet-tools schema /user/hive/warehouse/your_database/your_table/data.parquet
2. 创建Doris目标表
根据Hive表结构创建Doris目标表,选择合适的聚合模型:
-- 创建Duplicate Key模型表(适用于明细查询)
CREATE TABLE doris_duplicate_key_table (
id INT,
name VARCHAR(100),
age INT,
email VARCHAR(100),
create_time DATETIME
) ENGINE=OLAP
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 10
PROPERTIES (
"replication_num" = "3",
"storage_medium" = "SSD"
);
-- 创建Aggregation Key模型表(适用于聚合查询)
CREATE TABLE doris_aggregation_key_table (
user_id INT,
user_name VARCHAR(100),
login_count BIGINT SUM, -- 聚合函数
total_amount DECIMAL(10,2) SUM,
last_login_time DATETIME MAX
) ENGINE=OLAP
AGGREGATE KEY(user_id, user_name)
DISTRIBUTED BY HASH(user_id) BUCKETS 10
PROPERTIES (
"replication_num" = "3",
"storage_medium" = "SSD"
);
-- 创建Unique Key模型表(适用于主键唯一场景)
CREATE TABLE doris_unique_key_table (
order_id BIGINT,
user_id INT,
product_id INT,
quantity INT,
price DECIMAL(10,2),
create_time DATETIME
) ENGINE=OLAP
UNIQUE KEY(order_id)
DISTRIBUTED BY HASH(order_id) BUCKETS 10
PROPERTIES (
"replication_num" = "3",
"storage_medium" = "SSD"
);
-- 创建分区表(适用于时间序列数据)
CREATE TABLE doris_partitioned_table (
id INT,
name VARCHAR(100),
amount DECIMAL(10,2),
event_date DATE,
event_hour INT
) ENGINE=OLAP
PARTITION BY RANGE(event_date)
DISTRIBUTED BY HASH(id) BUCKETS 10
PROPERTIES (
"replication_num" = "3",
"storage_medium" = "SSD"
);
3. 提交Broker Load作业
根据数据格式和导入需求,提交Broker Load作业:
-- 导入CSV格式数据
LOAD LABEL example_db.example_label_csv
(
DATA INFILE("hdfs://namenode:8020/user/hive/warehouse/your_database/your_table/*.csv")
INTO TABLE doris_duplicate_key_table
FORMAT AS "CSV"
COLUMNS TERMINATED BY ","
COLUMNS(id, name, age, email, create_time)
WHERE age > 18
)
WITH BROKER 'broker_name'
PROPERTIES
(
"timeout" = "3600",
"max_filter_ratio" = "0.1",
"exec_mem_limit" = "8589934592", -- 8GB
"load_parallelism" = "3"
);
-- 示例
LOAD LABEL gmall.label2025092413425005
(
DATA INFILE("hdfs:/dmp-prod1:8020/warehouse/gmall/ods/bijie_ods_gzbj_forestry_lcstzhjccg/qxg2023lcstzhjccg.txt")
INTO TABLE `bijie_ods_gzbj_forestry_lcstzhjccg`
COLUMNS TERMINATED BY "\t"
)
WITH HDFS
(
"fs.defaultFS"="hdfs:/dmp-prod1:8020",
"hadoop.username" = "rongxin"
)
PROPERTIES( "timeout"="14400", "load_parallelism" = "3" );
-- 导入ORC格式数据
LOAD LABEL example_db.example_label_orc
(
DATA INFILE("hdfs://namenode:8020/user/hive/warehouse/your_database/your_table/*.orc")
INTO TABLE doris_aggregation_key_table
FORMAT AS "ORC"
COLUMNS(user_id, user_name, login_count, total_amount, last_login_time)
)
WITH BROKER 'broker_name'
PROPERTIES
(
"timeout" = "7200",
"max_filter_ratio" = "0.05",
"exec_mem_limit" = "17179869184", -- 16GB
"load_parallelism" = "5"
);
-- 导入Parquet格式数据
LOAD LABEL example_db.example_label_parquet
(
DATA INFILE("hdfs://namenode:8020/user/hive/warehouse/your_database/your_table/*.parquet")
INTO TABLE doris_unique_key_table
FORMAT AS "PARQUET"
COLUMNS(order_id, user_id, product_id, quantity, price, create_time)
)
WITH BROKER 'broker_name'
PROPERTIES
(
"timeout" = "5400",
"max_filter_ratio" = "0.02",
"exec_mem_limit" = "34359738368", -- 32GB
"load_parallelism" = "8"
);
-- 导入分区表数据
LOAD LABEL example_db.example_label_partitioned
(
DATA INFILE("hdfs://namenode:8020/user/hive/warehouse/your_database/your_table/*")
INTO TABLE doris_partitioned_table
PARTITION BY (event_date)
FORMAT AS "CSV"
COLUMNS TERMINATED BY ","
COLUMNS(id, name, amount, event_date, event_hour)
WHERE event_date >= '2023-01-01'
)
WITH BROKER 'broker_name'
PROPERTIES
(
"timeout" = "10800", -- 3小时
"max_filter_ratio" = "0.1",
"exec_mem_limit" = "8589934592", -- 8GB
"load_parallelism" = "10"
);
-- 导入多个文件到多个表
LOAD LABEL example_db.example_label_multi
(
DATA INFILE("hdfs://namenode:8020/user/hive/warehouse/your_database/your_table1/*.csv")
INTO TABLE doris_table1
FORMAT AS "CSV"
COLUMNS TERMINATED BY ","
COLUMNS(id, name, age),
DATA INFILE("hdfs://namenode:8020/user/hive/warehouse/your_database/your_table2/*.csv")
INTO TABLE doris_table2
FORMAT AS "CSV"
COLUMNS TERMINATED BY ","
COLUMNS(order_id, user_id, amount, create_time)
)
WITH BROKER 'broker_name'
PROPERTIES
(
"timeout" = "7200",
"max_filter_ratio" = "0.1",
"exec_mem_limit" = "17179869184" -- 16GB
);
4. 监控导入状态
-- 查看所有导入任务
SHOW LOAD FROM example_db;
-- 查看特定标签的导入任务
SHOW LOAD FROM example_db WHERE LABEL = "example_label_csv";
-- 查看导入任务的详细信息
SHOW LOAD WHERE LABEL = "example_label_csv" \G
-- 查看导入任务的进度
SHOW PROC '/current_load_tasks' WHERE Label = "example_label_csv";
-- 查看导入任务的错误信息
SHOW LOAD WHERE LABEL = "example_label_csv" AND State = "CANCELLED" \G
-- 取消导入任务
CANCEL LOAD FROM example_db WHERE LABEL = "example_label_csv";
-- 重试失败的导入任务
LOAD LABEL example_db.example_label_csv
(
DATA INFILE("hdfs://namenode:8020/user/hive/warehouse/your_database/your_table/*.csv")
INTO TABLE doris_duplicate_key_table
FORMAT AS "CSV"
COLUMNS TERMINATED BY ","
COLUMNS(id, name, age, email, create_time)
)
WITH BROKER 'broker_name'
PROPERTIES
(
"timeout" = "3600",
"max_filter_ratio" = "0.1",
"exec_mem_limit" = "8589934592",
"load_parallelism" = "3",
"strict_mode" = "true" -- 严格模式,遇到错误立即失败
);
Broker Load的必要条件
1. Broker服务正常运行
Broker服务必须在所有Doris BE节点上运行:确保Broker服务在所有Doris BE节点上正常运行,可以使用以下命令检查:
# 检查Broker进程
ps aux | grep broker
# 检查Broker端口
netstat -tlnp | grep 8000
# 检查Broker日志
tail -f /opt/doris/broker/logs/broker.log
Broker服务必须能够访问HDFS或其他存储系统:验证Broker服务能够访问HDFS或其他存储系统:
# 使用Broker测试HDFS访问
java -jar /opt/doris/broker/apache-doris-2.0.8-bin/be/lib/broker.jar \
--config_file=/opt/doris/broker/apache-doris-2.0.8-bin/fe/conf/broker.conf \
--test_hdfs hdfs://namenode:8020/user/hive/warehouse/your_database/your_table/
Broker服务配置必须与Doris集群配置匹配:确保Broker服务的配置参数与Doris集群的配置参数一致,特别是HDFS相关配置。
2. 正确的权限配置
Doris必须有权限访问HDFS上的数据文件:验证Doris有足够的权限访问HDFS上的数据文件:
# 检查HDFS权限
hadoop fs -ls /user/hive/warehouse/your_database/your_table/
# 修改HDFS权限
hadoop fs -chown -R doris:hadoop /user/hive/warehouse/your_database/your_table/
hadoop fs -chmod -R 755 /user/hive/warehouse/your_database/your_table/
如果使用Kerberos认证,必须正确配置相关参数:在Kerberos环境中,必须正确配置以下参数:
CREATE CATALOG hive_catalog PROPERTIES (
"type" = "hive",
"hive.metastore.uris" = "thrift://hive-metastore:9083",
"hive.metastore.sasl.enabled" = "true",
"hive.metastore.kerberos.principal" = "hive/_HOST@YOUR-REALM",
"hive.metastore.kerberos.keytab" = "/path/to/hive.keytab"
);
ALTER SYSTEM ADD BROKER broker_name "broker_host:8000" PROPERTIES (
"broker.hadoop.security.authentication" = "kerberos",
"broker.hadoop.kerberos.principal" = "hdfs/_HOST@YOUR-REALM",
"broker.hadoop.kerberos.keytab" = "/path/to/hdfs.keytab"
);
确保Doris用户有执行Broker Load的权限:验证Doris用户有执行Broker Load的权限:
-- 检查用户权限
SHOW GRANT FOR CURRENT_USER();
-- 授予LOAD权限
GRANT LOAD ON DATABASE example_db TO USER 'doris_user';
-- 授管Broker权限
GRANT ADMIN_PRIV TO USER 'doris_user';
3. 适当的数据格式
支持的数据格式包括ORC、Parquet、CSV等:Broker Load支持多种数据格式,包括:
ORC (Optimized Row Columnar)
Parquet
CSV (Comma-Separated Values)
JSON
Avro
确保数据格式与表结构匹配:在创建导入任务时,确保数据格式与表结构匹配:
-- 对于ORC格式
LOAD LABEL example_db.example_label_orc
(
DATA INFILE("hdfs://namenode:8020/user/hive/warehouse/your_database/your_table/*.orc")
INTO TABLE doris_table
FORMAT AS "ORC"
COLUMNS(id, name, age)
)
WITH BROKER 'broker_name';
-- 对于Parquet格式
LOAD LABEL example_db.example_label_parquet
(
DATA INFILE("hdfs://namenode:8020/user/hive/warehouse/your_database/your_table/*.parquet")
INTO TABLE doris_table
FORMAT AS "PARQUET"
COLUMNS(id, name, age)
)
WITH BROKER 'broker_name';
-- 对于CSV格式
LOAD LABEL example_db.example_label_csv
(
DATA INFILE("hdfs://namenode:8020/user/hive/warehouse/your_database/your_table/*.csv")
INTO TABLE doris_table
FORMAT AS "CSV"
COLUMNS TERMINATED BY ","
COLUMNS(id, name, age)
)
WITH BROKER 'broker_name';
处理复杂的数据类型:对于复杂的数据类型(如日期、时间戳、 decimal等),确保正确映射:
-- 处理日期和时间戳
LOAD LABEL example_db.example_label_datetime
(
DATA INFILE("hdfs://namenode:8020/user/hive/warehouse/your_database/your_table/*.csv")
INTO TABLE doris_table
FORMAT AS "CSV"
COLUMNS TERMINATED BY ","
COLUMNS(id, name, birth_date DATE, create_time DATETIME)
)
WITH BROKER 'broker_name';
-- 处理Decimal类型
LOAD LABEL example_db.example_label_decimal
(
DATA INFILE("hdfs://namenode:8020/user/hive/warehouse/your_database/your_table/*.csv")
INTO TABLE doris_table
FORMAT AS "CSV"
COLUMNS TERMINATED BY ","
COLUMNS(id, name, amount DECIMAL(10,2), price DECIMAL(12,4))
)
WITH BROKER 'broker_name';
4. 足够的存储空间
Doris集群必须有足够的存储空间接收导入的数据:在执行Broker Load之前,确保Doris集群有足够的存储空间:
-- 查看集群存储使用情况
SHOW PROC '/cluster_info' \G
-- 查看BE节点存储使用情况
SHOW PROC '/backends' \G
-- 查看表存储使用情况
SHOW TABLET FROM example_db.doris_table;
考虑副本数量对存储空间的需求:Doris默认创建3个副本,因此在计算存储需求时需要考虑副本数量:
-- 计算表所需存储空间
SELECT
table_name,
table_size,
table_size * 3 AS total_size_with_replicas,
table_data_size,
table_index_size
FROM information_schema.tables
WHERE table_schema = 'example_db';
监控存储空间使用情况:定期监控存储空间使用情况,避免因空间不足导致导入失败:
-- 设置存储空间告警
CREATE ALERT storage_usage_alert
SCHEDULE EVERY 1 HOUR
DO
SELECT
'Storage usage is above 80%' AS alert_message,
NOW() AS alert_time
FROM information_schema.tables
WHERE table_schema = 'example_db'
AND table_size / (SELECT table_size FROM information_schema.tables WHERE table_schema = 'example_db' AND table_name = 'doris_table') > 0.8;
最新版与Doris-1.2.4的区别
随着Doris版本的不断更新,其Broker Load功能也在持续增强。最新版与1.2.4相比有以下主要区别:
1. Broker功能增强
支持更多的存储系统:最新版Doris支持更多的存储系统,包括:
阿里云OSS
腾讯云COS
华为云OBS
AWS S3
Google Cloud Storage
Azure Blob Storage
配示例:
-- 配置阿里云OSS
ALTER SYSTEM ADD BROKER oss_broker "broker_host:8000" PROPERTIES (
"broker.default.buckets" = "your-bucket",
"broker.default.endpoint" = "oss-cn-hangzhou.aliyuncs.com",
"broker.default.access_key" = "your-access-key",
"broker.default.secret_key" = "your-secret-key"
);
-- 使用OSS导入数据
LOAD LABEL example_db.example_label_oss
(
DATA INFILE("oss://your-bucket/path/to/data/*.csv")
INTO TABLE doris_table
FORMAT AS "CSV"
COLUMNS TERMINATED BY ","
COLUMNS(id, name, age)
)
WITH BROKER 'oss_broker';
支持更细粒度的权限控制:最新版提供了更细粒度的权限控制,可以控制用户对特定Broker的访问权限:
-- 授予用户使用特定Broker的权限
GRANT BROKER_PRIV ON BROKER 'broker_name' TO USER 'doris_user';
-- 撤销Broker权限
REVOKE BROKER_PRIV ON BROKER 'broker_name' FROM USER 'doris_user';
改进了错误处理和重试机制:最新版改进了错误处理和重试机制,提高了数据导入的可靠性:
-- 配置重试策略
LOAD LABEL example_db.example_label_retry
(
DATA INFILE("hdfs://namenode:8020/user/hive/warehouse/your_database/your_table/*.csv")
INTO TABLE doris_table
FORMAT AS "CSV"
COLUMNS TERMINATED BY ","
COLUMNS(id, name, age)
)
WITH BROKER 'broker_name'
PROPERTIES
(
"timeout" = "3600",
"max_filter_ratio" = "0.1",
"exec_mem_limit" = "8589934592",
"max_retries" = "3", -- 最大重试次数
"retry_interval" = "60" -- 重试间隔(秒)
);
2. 性能优化
最新版的Broker Load性能显著提升:特别是在处理大数据量时,最新版的Broker Load性能显著提升:
-- 配置并行度
LOAD LABEL example_db.example_label_parallel
(
DATA INFILE("hdfs://namenode:8020/user/hive/warehouse/your_database/your_table/*.csv")
INTO TABLE doris_table
FORMAT AS "CSV"
COLUMNS TERMINATED BY ","
COLUMNS(id, name, age)
)
WITH BROKER 'broker_name'
PROPERTIES
(
"timeout" = "3600",
"max_filter_ratio" = "0.1",
"exec_mem_limit" = "8589934592",
"load_parallelism" = "16", -- 提高并行度
"pipeline_parallelism" = "4" -- 管道并行度
);
优化了内存使用:最新版优化了内存使用,减少了内存溢出的风险:
-- 配置内存限制
LOAD LABEL example_db.example_label_memory
(
DATA INFILE("hdfs://namenode:8020/user/hive/warehouse/your_database/your_table/*.csv")
INTO TABLE doris_table
FORMAT AS "CSV"
COLUMNS TERMINATED BY ","
COLUMNS(id, name, age)
)
WITH BROKER 'broker_name'
PROPERTIES
(
"timeout" = "3600",
"max_filter_ratio" = "0.1",
"exec_mem_limit" = "17179869184", -- 16GB
"memory_limit_per_task" = "4294967296", -- 4GB per task
"spill_threshold" = "1073741824" -- 1GB spill threshold
);
3. 新特性
支持增量导入功能:最新版支持增量导入功能,可以只导入新增或变更的数据:
-- 基于时间戳的增量导入
LOAD LABEL example_db.example_label_incremental
(
DATA INFILE("hdfs://namenode:8020/user/hive/warehouse/your_database/your_table/*.csv")
INTO TABLE doris_table
FORMAT AS "CSV"
COLUMNS TERMINATED BY ","
COLUMNS(id, name, age, update_time)
WHERE update_time >= '2023-01-01 00:00:00' AND update_time < '2023-01-02 00:00:00'
)
WITH BROKER 'broker_name'
PROPERTIES
(
"timeout" = "3600",
"max_filter_ratio" = "0.1",
"exec_mem_limit" = "8589934592",
"is_incremental" = "true" -- 标记为增量导入
);
-- 基于文件修改时间的增量导入
LOAD LABEL example_db.example_label_incremental_mtime
(
DATA INFILE("hdfs://namenode:8020/user/hive/warehouse/your_database/your_table/*.csv")
INTO TABLE doris_table
FORMAT AS "CSV"
COLUMNS TERMINATED BY ","
COLUMNS(id, name, age)
)
WITH BROKER 'broker_name'
PROPERTIES
(
"timeout" = "3600",
"max_filter_ratio" = "0.1",
"exec_mem_limit" = "8589934592",
"is_incremental" = "true",
"incremental_by_mtime" = "true" -- 基于文件修改时间
);
支持导入过程中的数据转换:最新版支持导入过程中的数据转换,可以在导入时进行数据清洗和转换:
-- 导入时进行数据转换
LOAD LABEL example_db.example_label_transform
(
DATA INFILE("hdfs://namenode:8020/user/hive/warehouse/your_database/your_table/*.csv")
INTO TABLE doris_table
FORMAT AS "CSV"
COLUMNS TERMINATED BY ","
COLUMNS(
id,
name,
age,
-- 转换列
upper(name) AS name_upper,
CASE
WHEN age < 18 THEN 'minor'
WHEN age BETWEEN 18 AND 65 THEN 'adult'
ELSE 'senior'
END AS age_group,
-- 计算列
age * 365 AS age_in_days
)
)
WITH BROKER 'broker_name'
PROPERTIES
(
"timeout" = "3600",
"max_filter_ratio" = "0.1",
"exec_mem_limit" = "8589934592"
);
支持更复杂的导入策略:最新版支持更复杂的导入策略,如条件导入、部分导入等:
-- 条件导入
LOAD LABEL example_db.example_label_conditional
(
DATA INFILE("hdfs://namenode:8020/user/hive/warehouse/your_database/your_table/*.csv")
INTO TABLE doris_table
FORMAT AS "CSV"
COLUMNS TERMINATED BY ","
COLUMNS(id, name, age, status)
WHERE status IN ('active', 'pending') AND age > 18
)
WITH BROKER 'broker_name'
PROPERTIES
(
"timeout" = "3600",
"max_filter_ratio" = "0.1",
"exec_mem_limit" = "8589934592"
);
-- 部分导入(只导入部分列)
LOAD LABEL example_db.example_label_partial
(
DATA INFILE("hdfs://namenode:8020/user/hive/warehouse/your_database/your_table/*.csv")
INTO TABLE doris_table
FORMAT AS "CSV"
COLUMNS TERMINATED BY ","
COLUMNS(id, name, age) -- 只导入部分列
)
WITH BROKER 'broker_name'
PROPERTIES
(
"timeout" = "3600",
"max_filter_ratio" = "0.1",
"exec_mem_limit" = "8589934592"
);
4. 兼容性改进
最新版对Hive版本的兼容性更好:最新版Doris对Hive版本的兼容性更好,支持Hive 2.x和3.x版本:
-- 配置Hive Metastore版本
CREATE CATALOG hive_catalog PROPERTIES (
"type" = "hive",
"hive.metastore.uris" = "thrift://hive-metastore:9083",
"hive.metastore.version" = "3.1.2", -- 指定Hive版本
"hive.metastore.sasl.enabled" = "true",
"hive.metastore.kerberos.principal" = "hive/_HOST@YOUR-REALM",
"hive.metastore.kerberos.keytab" = "/path/to/hive.keytab"
);
支持更多Hive数据类型和格式:最新版支持更多Hive数据类型和格式,包括:
Hive 3.0+的新数据类型(如TIMESTAMP_WITH_TIMEZONE)
复杂类型(ARRAY, MAP, STRUCT)
自定义SerDe
示例:
-- 处理Hive复杂类型
CREATE EXTERNAL TABLE doris_complex_types_table (
id INT,
tags ARRAY<VARCHAR(100)>, -- 数组类型
properties MAP<VARCHAR(100), VARCHAR(100)>, -- 映射类型
address STRUCT<street:VARCHAR(100), city:VARCHAR(100), zip:VARCHAR(10)> -- 结构体类型
) ENGINE=HIVE
PROPERTIES (
"table" = "your_hive_complex_table",
"database" = "your_hive_database"
);
-- 查询复杂类型
SELECT
id,
array_join(tags, ',') AS tags_str,
map_keys(properties) AS property_keys,
address.city AS city
FROM doris_complex_types_table;
5. Bug修复
修复了1.2.4版本中的多个已知问题:最新版修复了1.2.4版本中的多个已知问题,包括:
内存泄漏问题
并度导入时的数据不一致问题
特定数据格式解析错误
Kerberos认证失败问题
提高了系统的稳定性和可靠性:通过修复这些Bug,最新版Doris的稳定性和可靠性得到了显著提高:
-- 配置稳定性相关参数
LOAD LABEL example_db.example_label_stable
(
DATA INFILE("hdfs://namenode:8020/user/hive/warehouse/your_database/your_table/*.csv")
INTO TABLE doris_table
FORMAT AS "CSV"
COLUMNS TERMINATED BY ","
COLUMNS(id, name, age)
)
WITH BROKER 'broker_name'
PROPERTIES
(
"timeout" = "3600",
"max_filter_ratio" = "0.1",
"exec_mem_limit" = "8589934592",
"strict_mode" = "true", -- 严格模式
"timeout_second" = "7200", -- 增加超时时间
"send_batch_parallelism" = "4" -- 发送批次并行度
);
最佳实践
1. 根据数据量选择合适的导入方式
小数据量(GB级别):使用Catalog方式,实现实时查询
-- 创建Catalog
CREATE CATALOG hive_catalog PROPERTIES (
"type" = "hive",
"hive.metastore.uris" = "thrift://hive-metastore:9083"
);
-- 创建外部表
CREATE EXTERNAL TABLE doris_external_table (
id INT,
name VARCHAR(100),
age INT
) ENGINE=HIVE
PROPERTIES (
"table" = "your_hive_table",
"database" = "your_hive_database"
);
-- 直接查询
SELECT * FROM doris_external_table WHERE age > 30;
中等数据量(10GB-100GB):使用Broker Load,但设置较小的并行度
-- 使用Broker Load导入中等数据量
LOAD LABEL example_db.example_label_medium
(
DATA INFILE("hdfs://namenode:8020/user/hive/warehouse/your_database/your_table/*.csv")
INTO TABLE doris_table
FORMAT AS "CSV"
COLUMNS TERMINATED BY ","
COLUMNS(id, name, age)
)
WITH BROKER 'broker_name'
PROPERTIES
(
"timeout" = "3600",
"max_filter_ratio" = "0.1",
"exec_mem_limit" = "8589934592", -- 8GB
"load_parallelism" = "4" -- 适中的并行度
);
大数据量(100GB以上):使用Broker Load,设置较大的并行度和内存限制,数据量达到TB,可考虑使用
Spark LOAD
-- 使用Broker Load导入大数据量
LOAD LABEL example_db.example_label_large
(
DATA INFILE("hdfs://namenode:8020/user/hive/warehouse/your_database/your_table/*.csv")
INTO TABLE doris_table
FORMAT AS "CSV"
COLUMNS TERMINATED BY ","
COLUMNS(id, name, age)
)
WITH BROKER 'broker_name'
PROPERTIES
(
"timeout" = "7200", -- 2小时
"max_filter_ratio" = "0.05",
"exec_mem_limit" = "34359738368", -- 32GB
"load_parallelism" = "16" -- 高并行度
);
实时数据流:考虑使用Stream Load或Routine Load
-- 创建流导入任务
CREATE ROUTINE LOAD example_db.routine_load_example
ON doris_stream_table
COLUMNS(id, name, age),
COLUMNS(kafka_key, kafka_timestamp, kafka_topic, kafka_partition, kafka_offset)
FROM KAFKA
(
"kafka_bootstrap_servers" = "kafka1:9092,kafka2:9092",
"kafka_topic" = "your_topic",
"kafka_partitions" = "0,1,2",
"kafka_offsets" = "OFFSET_BEGINNING"
)
PROPERTIES
(
"format" = "json",
"json_root" = "$.data",
"jsonpaths" = "$.id,$.name,$.age"
);
2. 合理配置Broker参数
根据数据量调整超时时间:根据数据量大小合理设置超时时间
-- 小数据量(GB级别)
"timeout" = "1800" -- 30分钟
-- 中等数据量(10GB-100GB)
"timeout" = "3600" -- 1小时
-- 大数据量(100GB以上)
"timeout" = "7200" -- 2小时
-- 超大数据量(1TB以上)
"timeout" = "21600" -- 6小时
设置合理的错误容忍比例:根据数据质量设置错误容忍比例
-- 高质量数据
"max_filter_ratio" = "0.01" -- 1%错误容忍
-- 中等质量数据
"max_filter_ratio" = "0.05" -- 5%错误容忍
-- 低质量数据
"max_filter_ratio" = "0.1" -- 10%错误容忍
-- 极低质量数据
"max_filter_ratio" = "0.2" -- 20%错误容忍
优化内存使用:根据集群资源合理设置内存限制
-- 小集群(8GB内存)
"exec_mem_limit" = "4294967296" -- 4GB
-- 中等集群(32GB内存)
"exec_mem_limit" = "17179869184" -- 16GB
-- 大集群(64GB内存)
"exec_mem_limit" = "34359738368" -- 32GB
-- 超大集群(128GB内存)
"exec_mem_limit" = "68719476736" -- 64GB
调整并行度:根据CPU核心数和数据量调整并行度
-- 小数据量
"load_parallelism" = "2"
-- 中等数据量
"load_parallelism" = "4"
-- 大数据量
"load_parallelism" = "8"
-- 超大数据量
"load_parallelism" = "16"
3. 监控导入过程
定期检查导入状态:定期检查导入任务的状态,确保数据正常导入
-- 查看所有导入任务
SHOW LOAD FROM example_db;
-- 查看特定标签的导入任务
SHOW LOAD FROM example_db WHERE LABEL = "example_label";
-- 查看导入任务的详细信息
SHOW LOAD WHERE LABEL = "example_label" \G
-- 查看导入任务的进度
SHOW PROC '/current_load_tasks' WHERE Label = "example_label";
设置导入任务告警:设置导入任务告警,及时发现和处理导入失败的情况
-- 创建导入失败告警
CREATE ALERT load_failure_alert
SCHEDULE EVERY 5 MINUTE
DO
SELECT
'Load task failed: ' || LABEL AS alert_message,
State AS state,
Progress AS progress,
EtlInfo AS etl_info,
TxnId AS txn_id,
CreateTime AS create_time,
FinishTime AS finish_time,
UnselectedRows AS unselected_rows,
DeadRows AS dead_rows,
LoadRows AS load_rows,
FilteredRows AS filtered_rows,
SourceRows AS source_rows,
JobStatus AS job_status
FROM information_schema.loads
WHERE State = 'CANCELLED'
AND CreateTime >= NOW() - INTERVAL 1 HOUR;
监控导入性能:监控导入性能指标,优化导入参数
-- 查看导入性能指标
SELECT
LABEL,
State,
Progress,
EtlInfo,
TxnId,
CreateTime,
FinishTime,
UnselectedRows,
DeadRows,
LoadRows,
FilteredRows,
SourceRows,
JobStatus,
Cast((LoadRows + FilteredRows + DeadRows) * 100.0 / NULLIF(SourceRows, 0) AS DECIMAL(10,2)) AS load_rate,
TIMESTAMPDIFF(SECOND, CreateTime, FinishTime) AS duration_seconds
FROM information_schema.loads
WHERE CreateTime >= NOW() - INTERVAL 1 DAY
ORDER BY CreateTime DESC;
分析导入失败原因:分析导入失败的原因,采取相应的解决措施
-- 查看导入失败详情
SELECT
LABEL,
State,
EtlInfo,
TxnId,
CreateTime,
FinishTime,
UnselectedRows,
DeadRows,
LoadRows,
FilteredRows,
SourceRows,
JobStatus,
ErrorMsg AS error_message
FROM information_schema.loads
WHERE State = 'CANCELLED'
AND CreateTime >= NOW() - INTERVAL 1 DAY;
-- 分析错误类型
SELECT
ErrorMsg AS error_message,
COUNT(*) AS error_count
FROM information_schema.loads
WHERE State = 'CANCELLED'
AND CreateTime >= NOW() - INTERVAL 1 DAY
GROUP BY ErrorMsg
ORDER BY error_count DESC;
4. 定期维护元数据
保持Hive和Doris元数据的一致性:定期同步Hive和Doris的元数据,确保数据结构一致
-- 刷新整个数据库的元数据
REFRESH DATABASE hive_db;
-- 刷新特定表的元数据
REFRESH TABLE doris_external_table;
-- 刷新分区元数据
REFRESH PARTITION p1 IN TABLE doris_partitioned_table;
定期清理不再需要的导入任务:定期清理不再需要的导入任务,避免元数据表过大
-- 查看旧的导入任务
SELECT
LABEL,
State,
CreateTime,
FinishTime,
TIMESTAMPDIFF(DAY, FinishTime, NOW()) AS days_ago
FROM information_schema.loads
WHERE FinishTime <= NOW() - INTERVAL 7 DAY
ORDER BY FinishTime DESC;
-- 清理旧的导入任务
DELETE FROM information_schema.loads
WHERE FinishTime <= NOW() - INTERVAL 7 DAY;
优化表结构:根据查询需求优化表结构,提高查询性能
-- 添加合适的索引
CREATE INDEX idx_age ON doris_table(age);
CREATE INDEX idx_name ON doris_table(name);
-- 修改表属性
ALTER TABLE doris_table SET ("replication_num" = "3");
ALTER TABLE doris_table SET ("storage_medium" = "SSD");
-- 修改分区策略
ALTER TABLE doris_table ADD PARTITION p202301 VALUES LESS THAN ('2023-02-01');
定期统计信息收集:定期收集表的统计信息,优化查询计划
-- 收集单表统计信息
ANALYZE TABLE doris_table COMPUTE STATISTICS;
-- 收集列统计信息
ANALYZE TABLE doris_table COMPUTE STATISTICS FOR COLUMNS id, name, age;
-- 收集分区统计信息
ANALYZE TABLE doris_table PARTITION (p202301) COMPUTE STATISTICS;
总结
本文详细介绍了两种将Hive数据导入到Doris的方法:Catalog方式和Broker Load方式,并深入探讨了各自的优缺点、适用场景以及在实际应用中可能遇到的问题和解决方案。
Catalog方式是一种虚拟化的数据访问方式,它允许Doris直接查询Hive中的数据,而无需实际复制数据。这种方式特别适合小数据量和实时查询场景,因为它提供了数据的实时性、存储效率和数据一致性等优势。然而,Catalog方式也存在一些局限性,如性能问题、元数据同步问题和数据格式兼容性问题等,需要通过适当的配置和优化来解决。
Broker Load是一种异步的数据导入方式,它通过Broker进程将数据从HDFS导入到Doris,避免了内存限制问题。这种方式特别适合大数据量导入场景,因为它支持异步导入、大数据量处理、多种数据格式和完善的错误处理机制等。Broker Load需要正确配置Broker服务、权限设置、数据格式和存储空间等条件,才能确保数据导入的顺利进行。
在实际应用中,应根据具体需求选择合适的导入方式。对于小数据量和实时查询需求,Catalog方式是更好的选择;对于大数据量导入需求,Broker Load则更为合适。无论选择哪种方式,都需要注意配置优化、问题排查和定期维护等工作,以确保数据导入的高效和可靠。
随着Doris版本的不断更新,其导入功能也在持续增强。最新版Doris提供了更多的存储系统支持、更细粒度的权限控制、更完善的错误处理机制、更优化的性能表现以及更多的新特性,如增量导入、数据转换和复杂导入策略等。建议用户关注Doris官方文档,及时了解最新功能,并根据实际需求选择合适的版本。
通过本文的指导,读者应该能够根据自身业务需求选择最合适的数据导入方案,并成功实现Hive与Doris之间的数据流转,为企业的数据分析和决策提供有力的支持。
评论区