maven依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
   | <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">     <modelVersion>4.0.0</modelVersion>
      <groupId>com.xxzuo</groupId>     <artifactId>xxx</artifactId>     <version>0.0.1-SNAPSHOT</version>     <name>xxx</name>     <description>xxx</description>
      <properties>         <maven.compiler.source>1.8</maven.compiler.source>         <maven.compiler.target>1.8</maven.compiler.target>         <encoding>UTF-8</encoding>         <scala.binary.version>2.11</scala.binary.version>         <hadoop.version>3.2.2</hadoop.version>         <hive.version>3.1.2</hive.version>         <flink.version>1.14.4</flink.version>         <druid.version>1.1.20</druid.version>     </properties>
      <dependencies>         <dependency>             <groupId>org.projectlombok</groupId>             <artifactId>lombok</artifactId>             <version>1.18.24</version>         </dependency>         <dependency>             <groupId>org.apache.flink</groupId>             <artifactId>flink-clients_${scala.binary.version}</artifactId>             <version>${flink.version}</version>         </dependency>         <dependency>             <groupId>com.ververica</groupId>             <artifactId>flink-connector-mysql-cdc</artifactId>             <version>2.0.2</version>         </dependency>         <dependency>             <groupId>org.apache.flink</groupId>             <artifactId>flink-csv</artifactId>             <version>${flink.version}</version>         </dependency>         <dependency>             <groupId>org.apache.flink</groupId>             <artifactId>flink-connector-base</artifactId>             <version>${flink.version}</version>         </dependency>         <dependency>             <groupId>org.apache.flink</groupId>             <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>             <version>${flink.version}</version>         </dependency>
          <dependency>             <groupId>org.apache.flink</groupId>             <artifactId>flink-table-planner_${scala.binary.version}</artifactId>             <version>${flink.version}</version>         </dependency>
          <dependency>             <groupId>org.apache.flink</groupId>             <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>             <version>${flink.version}</version>         </dependency>         <dependency>             <groupId>org.apache.flink</groupId>             <artifactId>flink-json</artifactId>             <version>${flink.version}</version>         </dependency>         <dependency>             <groupId>org.apache.flink</groupId>             <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>             <version>${flink.version}</version>         </dependency>
          <dependency>             <groupId>org.apache.flink</groupId>             <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>             <version>${flink.version}</version>         </dependency>
 
          <dependency>             <groupId>org.apache.flink</groupId>             <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>             <version>${flink.version}</version>         </dependency>
          <dependency>             <groupId>org.apache.commons</groupId>             <artifactId>commons-math3</artifactId>             <version>3.6.1</version>         </dependency>
          <dependency>             <groupId>org.apache.hive</groupId>             <artifactId>hive-exec</artifactId>             <version>3.1.0</version>         </dependency>
          <dependency>             <groupId>org.apache.thrift</groupId>             <artifactId>libfb303</artifactId>             <version>0.9.3</version>         </dependency>
          <dependency>             <groupId>org.antlr</groupId>             <artifactId>antlr-runtime</artifactId>             <version>3.5.2</version>         </dependency>         <dependency>             <groupId>com.alibaba</groupId>             <artifactId>fastjson</artifactId>             <version>1.2.58</version>         </dependency>         <dependency>             <groupId>com.alibaba</groupId>             <artifactId>druid</artifactId>             <version>${druid.version}</version>         </dependency>         <dependency>             <groupId>org.apache.commons</groupId>             <artifactId>commons-lang3</artifactId>             <version>3.9</version>         </dependency>
          <dependency>             <groupId>mysql</groupId>             <artifactId>mysql-connector-java</artifactId>             <version>8.0.13</version>         </dependency>
      </dependencies>
      <build>         <plugins>                          <plugin>                 <groupId>org.apache.maven.plugins</groupId>                 <artifactId>maven-compiler-plugin</artifactId>                 <version>3.6.0</version>                 <configuration>                     <source>1.8</source>                     <target>1.8</target>                     <encoding>UTF-8</encoding>                 </configuration>             </plugin>                          <plugin>                 <groupId>org.apache.maven.plugins</groupId>                 <artifactId>maven-assembly-plugin</artifactId>                 <version>2.6</version>                 <configuration>                     <descriptorRefs>                         <descriptorRef>jar-with-dependencies</descriptorRef>                     </descriptorRefs>                     <archive>                         <manifest>                                                          <mainClass></mainClass>                         </manifest>                     </archive>                 </configuration>                 <executions>                     <execution>                         <id>make-assembly</id>                         <phase>package</phase>                         <goals>                             <goal>single</goal>                         </goals>                     </execution>                 </executions>             </plugin>         </plugins>     </build> </project>
 
 
  | 
 
kafka source
1 2 3 4 5 6 7 8 9
   | String bootstrapServers = "192.168.1.1:9092,192.168.1.2:9092"; KafkaSource<String> source = KafkaSource.<String>builder()         .setBootstrapServers(bootstrapServers)         .setTopics("kafka_topic")         .setGroupId("consumer_group")         .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))         .setValueOnlyDeserializer(new SimpleStringSchema())         .build(); DataStreamSource<String> stream = env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(), "kafka_source");
 
  | 
 
hive sink
设置hiveCatalog
hiveCatalog作用是:使用 hive 的 metastore去管理 flink元数据 ,持久化元数据,避免每次使用时都要重新注册
代码中使用 hive catalog
1 2 3 4 5 6 7 8 9 10 11
   |  String catalogName = "flinkHive";
  HiveCatalog catalog = new HiveCatalog(         catalogName,         "flink",         "/usr/local/hive/apache-hive-3.1.2-bin/conf/" ); tableEnv.registerCatalog(catalogName, catalog);
  tableEnv.useCatalog(catalogName);
 
  | 
将 kafka stream 映射为临时表
1 2
   |  tableEnv.createTemporaryView("flink.kafka_table", stream);
 
  | 
 
创建 hive 表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
   |  tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tableEnv.executeSql(" CREATE TABLE if not exists flink.hive_table (\n" +                         "hive_col1 STRING,\n" +                         "hive_col2 STRING,\n" +                         "           ) " +                         "PARTITIONED BY (\n" +                         "                     day STRING\n" +                         "           ) " +                         "STORED AS PARQUET\n" +                         "             TBLPROPERTIES (\n" +                                                                                                    "                    'sink.partition-commit.trigger' = 'process-time',\n" +                                                  "                    'sink.partition-commit.delay' = '0s',\n" +                                                                                                    "                    'sink.partition-commit.policy.kind' = 'metastore,success-file',\n" +                                                  "                    'partition.time-extractor.timestamp-pattern'='$day 00:00:00'" +                         "           )"         );                  tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
 
  | 
 
sink数据到 hive
1 2 3 4 5 6 7
   | TableResult tr = tableEnv.executeSql(                 "INSERT INTO flink.hive_table " +                         "SELECT " +                         "col1,\n" +                         "col2,\n" +                         "datetime\n" +                         " from flink.kafka_table A ");
 
  |