測試環境
元件版本
mysql 5。7
hive 3。1。2
flink 1。12。2
hudi 0。9。0
hadoop 3。2。0
首先請確保以下元件正常啟動:
mysql
hivemetastore
hiveserver2
hdfs
yarn
hudi適配hive 3。1。2原始碼編譯
0。9。0版本的hudi在適配hive3時,其hudi/package/hudi-flink-bundle/pom。xml檔案使用的flink-connector-hive版本有問題,所以需要修改pom檔案。
修改點一:
143行,修改為:
修改點二:
642行,修改為:
編譯命令:
mvn clean install -DskipTests -Pflink-bundle-shade-hive3 -Dhadoop。version=3。2。0 -Dhive。version=3。1。2 -Pinclude-flink-sql-connector-hive -U -Dscala。version=2。12。10 -Dscala。binary。version=2。12
將編譯後得到的hudi/package/hudi-flink-bundle/target/hudi-flink-bundle_2。12-0。9。0。jar複製到flink/lib目錄下,將得到的hudi/package/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0。9。0。jar複製到hive/auxlib目錄下,如果沒有這個目錄則新建一個即可。
關於flink操作hudi的相關方法如果有疑惑的可先看本系列的其他文章,例如:使用flink插入資料到hudi資料湖初探 | 從大資料到人工智慧, Flink SQL Client實戰CDC資料入湖 | 從大資料到人工智慧等。
生成測試資料
使用datafaker生成100000條資料,放到mysql資料庫中的stu4表。
datafaker工具使用方法見datafaker ——- 測試資料生成工具 | 從大資料到人工智慧
首先在mysql中新建表test。stu4
create database test;use test;create table stu4 ( id int unsigned auto_increment primary key COMMENT ‘自增id’, name varchar(20) not null comment ‘學生名字’, school varchar(20) not null comment ‘學校名字’, nickname varchar(20) not null comment ‘學生小名’, age int not null comment ‘學生年齡’, score decimal(4,2) not null comment ‘成績’, class_num int not null comment ‘班級人數’, phone bigint not null comment ‘電話號碼’, email varchar(64) comment ‘家庭網路郵箱’, ip varchar(32) comment ‘IP地址’ ) engine=InnoDB default charset=utf8;
新建meta。txt檔案,檔案內容為:
id||int||自增id[:inc(id,1)]name||varchar(20)||學生名字school||varchar(20)||學校名字[:enum(qinghua,beida,shanghaijiaoda,fudan,xidian,zhongda)]nickname||varchar(20)||學生小名[:enum(tom,tony,mick,rich,jasper)]age||int||學生年齡[:age]score||decimal(4,2)||成績[:decimal(4,2,1)]class_num||int||班級人數[:int(10, 100)]phone||bigint||電話號碼[:phone_number]email||varchar(64)||家庭網路郵箱[:email]ip||varchar(32)||IP地址[:ipv4]
生成10000條資料並寫入到mysql中的test。stu3表
datafaker rdb mysql+mysqldb://root:Pass-123-root@hadoop:3306/test?charset=utf8 stu4 100000 ——meta meta。txt
datafaker工具有詳細使用方法,請參考。
匯入mysql資料
使用flink sql client進行如下操作
構建源表
create table stu4( id bigint not null, name string, school string, nickname string, age int not null, score decimal(4,2) not null, class_num int not null, phone bigint not null, email string, ip string, PRIMARY KEY (id) NOT ENFORCED) with ( ‘connector’ = ‘jdbc’, ‘url’ = ‘jdbc:mysql://hadoop:3306/test?serverTimezone=GMT%2B8’, ‘username’ = ‘root’, ‘password’ = ‘Pass-123-root’, ‘table-name’ = ‘stu4’);
構建目標表
create table stu4_tmp_1( id bigint not null, name string, `school` string, nickname string, age int not null, score decimal(4,2) not null, class_num int not null, phone bigint not null, email string, ip string, primary key (id) not enforced) partitioned by (`school`) with ( ‘connector’ = ‘hudi’, ‘path’ = ‘hdfs://hadoop:9000/tmp/stu4_tmp_1’, ‘table。type’ = ‘COPY_ON_WRITE’, ‘write。precombine。field’ = ‘school’, ‘hive_sync。enable’ = ‘true’, ‘hive_sync。mode’ = ‘hms’, ‘hive_sync。metastore。uris’ = ‘thrift://hadoop:9083’, ‘hive_sync。jdbc_url’ = ‘jdbc:hive2://hadoop:10000’, ‘hive_sync。table’ = ‘stu_tmp_1’, ‘hive_sync。db’ = ‘test’, ‘hive_sync。username’ = ‘hive’, ‘hive_sync。password’ = ‘hive’ );
插入資料
insert into stu4_tmp_1 select * from stu4;
hive資料查詢
使用hive命令進入hive cli
執行如下命令查詢資料
select * from test。stu_tmp_1 limit 10;
結果: