使用flink SQL Client將mysql資料寫入到hudi並同步到hive

測試環境

元件版本

mysql 5。7

hive 3。1。2

flink 1。12。2

hudi 0。9。0

hadoop 3。2。0

首先請確保以下元件正常啟動:

mysql

hivemetastore

hiveserver2

hdfs

yarn

hudi適配hive 3。1。2原始碼編譯

0。9。0版本的hudi在適配hive3時,其hudi/package/hudi-flink-bundle/pom。xml檔案使用的flink-connector-hive版本有問題,所以需要修改pom檔案。

修改點一:

143行,修改為:

org。apache。flink:flink-sql-connector-hive-${hive。version}_${scala。binary。version}

修改點二:

642行,修改為:

flink-sql-connector-hive-${hive。version}_${scala。binary。version}

編譯命令:

mvn clean install -DskipTests -Pflink-bundle-shade-hive3 -Dhadoop。version=3。2。0 -Dhive。version=3。1。2 -Pinclude-flink-sql-connector-hive -U -Dscala。version=2。12。10 -Dscala。binary。version=2。12

將編譯後得到的hudi/package/hudi-flink-bundle/target/hudi-flink-bundle_2。12-0。9。0。jar複製到flink/lib目錄下,將得到的hudi/package/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0。9。0。jar複製到hive/auxlib目錄下,如果沒有這個目錄則新建一個即可。

關於flink操作hudi的相關方法如果有疑惑的可先看本系列的其他文章,例如:使用flink插入資料到hudi資料湖初探 | 從大資料到人工智慧, Flink SQL Client實戰CDC資料入湖 | 從大資料到人工智慧等。

生成測試資料

使用datafaker生成100000條資料,放到mysql資料庫中的stu4表。

datafaker工具使用方法見datafaker ——- 測試資料生成工具 | 從大資料到人工智慧

首先在mysql中新建表test。stu4

create database test;use test;create table stu4 ( id int unsigned auto_increment primary key COMMENT ‘自增id’, name varchar(20) not null comment ‘學生名字’, school varchar(20) not null comment ‘學校名字’, nickname varchar(20) not null comment ‘學生小名’, age int not null comment ‘學生年齡’, score decimal(4,2) not null comment ‘成績’, class_num int not null comment ‘班級人數’, phone bigint not null comment ‘電話號碼’, email varchar(64) comment ‘家庭網路郵箱’, ip varchar(32) comment ‘IP地址’ ) engine=InnoDB default charset=utf8;

新建meta。txt檔案,檔案內容為:

id||int||自增id[:inc(id,1)]name||varchar(20)||學生名字school||varchar(20)||學校名字[:enum(qinghua,beida,shanghaijiaoda,fudan,xidian,zhongda)]nickname||varchar(20)||學生小名[:enum(tom,tony,mick,rich,jasper)]age||int||學生年齡[:age]score||decimal(4,2)||成績[:decimal(4,2,1)]class_num||int||班級人數[:int(10, 100)]phone||bigint||電話號碼[:phone_number]email||varchar(64)||家庭網路郵箱[:email]ip||varchar(32)||IP地址[:ipv4]

生成10000條資料並寫入到mysql中的test。stu3表

datafaker rdb mysql+mysqldb://root:Pass-123-root@hadoop:3306/test?charset=utf8 stu4 100000 ——meta meta。txt

datafaker工具有詳細使用方法,請參考。

匯入mysql資料

使用flink sql client進行如下操作

構建源表

create table stu4( id bigint not null, name string, school string, nickname string, age int not null, score decimal(4,2) not null, class_num int not null, phone bigint not null, email string, ip string, PRIMARY KEY (id) NOT ENFORCED) with ( ‘connector’ = ‘jdbc’, ‘url’ = ‘jdbc:mysql://hadoop:3306/test?serverTimezone=GMT%2B8’, ‘username’ = ‘root’, ‘password’ = ‘Pass-123-root’, ‘table-name’ = ‘stu4’);

構建目標表

create table stu4_tmp_1( id bigint not null, name string, `school` string, nickname string, age int not null, score decimal(4,2) not null, class_num int not null, phone bigint not null, email string, ip string, primary key (id) not enforced) partitioned by (`school`) with ( ‘connector’ = ‘hudi’, ‘path’ = ‘hdfs://hadoop:9000/tmp/stu4_tmp_1’, ‘table。type’ = ‘COPY_ON_WRITE’, ‘write。precombine。field’ = ‘school’, ‘hive_sync。enable’ = ‘true’, ‘hive_sync。mode’ = ‘hms’, ‘hive_sync。metastore。uris’ = ‘thrift://hadoop:9083’, ‘hive_sync。jdbc_url’ = ‘jdbc:hive2://hadoop:10000’, ‘hive_sync。table’ = ‘stu_tmp_1’, ‘hive_sync。db’ = ‘test’, ‘hive_sync。username’ = ‘hive’, ‘hive_sync。password’ = ‘hive’ );

插入資料

insert into stu4_tmp_1 select * from stu4;

hive資料查詢

使用hive命令進入hive cli

執行如下命令查詢資料

select * from test。stu_tmp_1 limit 10;

結果:

使用flink SQL Client將mysql資料寫入到hudi並同步到hive