flink读kafka数据写入hive

maven依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.xxzuo</groupId>
<artifactId>xxx</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>xxx</name>
<description>xxx</description>

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.binary.version>2.11</scala.binary.version>
<hadoop.version>3.2.2</hadoop.version>
<hive.version>3.1.2</hive.version>
<flink.version>1.14.4</flink.version>
<druid.version>1.1.20</druid.version>
</properties>

<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
<version>3.6.1</version>
</dependency>

<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.0</version>
</dependency>

<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libfb303</artifactId>
<version>0.9.3</version>
</dependency>

<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr-runtime</artifactId>
<version>3.5.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>${druid.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.13</version>
</dependency>

</dependencies>

<build>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以设置jar包的入口类(可选) -->
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

kafka source

1
2
3
4
5
6
7
8
9
String bootstrapServers = "192.168.1.1:9092,192.168.1.2:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(bootstrapServers)
.setTopics("kafka_topic")
.setGroupId("consumer_group")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSource<String> stream = env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(), "kafka_source");

hive sink

设置hiveCatalog

hiveCatalog作用是:使用 hive 的 metastore去管理 flink元数据 ,持久化元数据,避免每次使用时都要重新注册
代码中使用 hive catalog

1
2
3
4
5
6
7
8
9
10
11
// 设置 catalog
String catalogName = "flinkHive";
// 路径为 hive 配置文件路径
HiveCatalog catalog = new HiveCatalog(
catalogName,
"flink",
"/usr/local/hive/apache-hive-3.1.2-bin/conf/"
);
tableEnv.registerCatalog(catalogName, catalog);
// 使用注册的catalog
tableEnv.useCatalog(catalogName);

将 kafka stream 映射为临时表

1
2
// 建立临时视图 映射 kafka source
tableEnv.createTemporaryView("flink.kafka_table", stream);

创建 hive 表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// hive 建表时 需要切换 方言为 HIVE
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql(" CREATE TABLE if not exists flink.hive_table (\n" +
"hive_col1 STRING,\n" +
"hive_col2 STRING,\n" +
" ) " +
"PARTITIONED BY (\n" +
" day STRING\n" +
" ) " +
"STORED AS PARQUET\n" +
" TBLPROPERTIES (\n" +
// partition-commit.trigger 触发分区提交
// process-time 根据处理时间触发
// partition-time 根据从事件时间中提取的分区时间触发
" 'sink.partition-commit.trigger' = 'process-time',\n" +
// partition-commit.delay 提交的延迟时间
" 'sink.partition-commit.delay' = '0s',\n" +
// partition-commit.policy.kind 分区提交策略
// metastore 提交到元数据
// success-file 写入_success文件到分区目录中
" 'sink.partition-commit.policy.kind' = 'metastore,success-file',\n" +
// time-extractor.timestamp-pattern 指定分区提取器提取时间戳的格式
" 'partition.time-extractor.timestamp-pattern'='$day 00:00:00'" +
" )"
);
// hive 建表完 把方言切换回 DEFAULT
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

sink数据到 hive

1
2
3
4
5
6
7
TableResult tr = tableEnv.executeSql(
"INSERT INTO flink.hive_table " +
"SELECT " +
"col1,\n" +
"col2,\n" +
"datetime\n" +
" from flink.kafka_table A ");

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!