一、前言
随着新兴的大数据处理框架不断推陈出新,storm已经逐渐淡出了人们的视野,storm的社区已经有近3年无人维护。storm已经被Flink替代,接下来,我们就以Flink框架作为大数据处理的核心框架进行研究,并把storm中的代码移植到Flink中。
先分析一下Flink的优势,了解一下选用Flink的优势和必要性。
具体的Flink优势和特点,可以从网上进行搜索去查看。具体内容,参考以下几个链接:
https://www.infoq.cn/article/fRt1RF1pxu_ZtmeObOoJ
本文主要以实战为主,先构建Flink的运行环境,通过实际运行代码来感受和分析Flink。
二、helloworld 初试
1.1 搭建运行环境
使用docker快速搭建和启动Flink环境。docker的具体使用,请网上自行检索。
docker-compose.yml
version: "2.1"
services:
jobmanager:
image: flink
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: flink
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
1.2 编写 helloword
maven工程配置文件:pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tchen</groupId>
<artifactId>flink</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>flink</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.7.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<!-- Example:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
-->
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.dalong.app.StreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<versionRange>[3.0.0,)</versionRange>
<goals>
<goal>shade</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<!-- This profile helps to make things run out of the box in IntelliJ -->
<!-- Its adds Flink's core classes to the runtime class path. -->
<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
<profiles>
<profile>
<id>add-dependencies-for-IDEA</id>
<activation>
<property>
<name>idea.version</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
java程序:helloworld.java
package com.tchen.flink.helloworld;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// 输入tcp流
final int port;
final String host;
port = 9008; // nc监听的tcp端口
host = "10.10.50.70"; // docker宿主机ip
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(host, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
// @Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>() {
// @Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// Data type for words with count
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
三、本地调试
示例代码:FlinkLocal.java
package com.tchen.flink.local;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
public class FlinkLocal {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
DataSet<String> data = env.readTextFile("file:///d:/flink-data.txt");
data
.filter(new FilterFunction<String>() {
public boolean filter(String value) {
return value.startsWith("http://");
}
})
.writeAsText("file:///d:/flink-data-out.txt");
JobExecutionResult res = env.execute();
}
}
参考资料:
官方:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/getting-started/tutorials/local_setup.html
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/pom.xml
https://flink-docs-cn.gitbook.io/project/05-ying-yong-kai-fa/batch-dataset-api/ben-di-zhi-hang
https://www.iamle.com/archives/2572.html
https://juejin.im/post/5c4f16dbe51d454f342fb7e7
https://www.infoq.cn/article/zbBAGroBgtytDiBs*Xq9
分享到:
相关推荐
Flink有一个非常重要的...课程内容包括了Flink安装部署,入门实战案例,Flink原理初探,流处理的教学,Flink高级API和Flink-Table-SQL-案例,Flink高级特性和新特性,Flink多语言开发,Flink监控与优化。 视频大小:4G
分享课程——《Flink SQL大数据项目实战》,2022新课,基于Flink1.14.3版本。提供视频配套的源码和文档下载! Flink SQL大数据项目实战课程以FlinkSQL流批一体技术为主线,全面讲解Flink Table编程、SQL编程、Time...
13-[掌握]-Flink原理初探-角色分工-执行流程-DataFlow 14-[掌握]-Flink原理初探-TaskSlot和TaskSlotSharing 15-[掌握]-Flink原理初探-执行流程图生成 Flink-day02 01-[理解]-流处理核心概念说明 02-[掌握]-Source-...
flink sql读写phoenix所使用到的连接器依赖包: flink-sql-connector-phoenix-1.14-1.0.jar 使用示例: create table tab2( ID STRING, NAME STRING, PRIMARY KEY (ID) NOT ENFORCED )WITH( 'connector' = '...
flink 1.6.0 源码 <!-- Dummy module to force execution of the Maven Shade plugin (see Shade plugin below) --> <module>tools/force-shading <module>flink-annotations <module>flink-shaded-...
作为新一代的开源流处理器,Flink是众多大数据处理框架中一颗冉冉升起的新星。它以同一种技术支持流处理和批处理,并能同时满足高吞吐、低延迟和容错的需求。本书由Flink项目核心成员执笔,系统阐释Flink的适用场景...
Flink JDBC Connector 支持Oracle , Flink 1.13.6 支持Oracle 11.2.0.4
Flink基础教程,Flink项目核心成员执笔,阿里巴巴自身技术专家悉心翻译,欢迎下载。
1、Flink 源码解析 —— 源码编译运行 2、Flink 源码解析 —— 项目结构一览 3、Flink 源码解析—— local 模式启动流程 4、Flink 源码解析 —— standalonesession 模式启动流程 5、Flink 源码解析 —— ...
Flink1.14.4自定义flink-connector-jdbc连接SQLServer和SAP数据库
赠送jar包:flink-java-1.14.3.jar; 赠送原API文档:flink-java-1.14.3-javadoc.jar; 赠送源代码:flink-java-1.14.3-sources.jar; 赠送Maven依赖信息文件:flink-java-1.14.3.pom; 包含翻译后的API文档:flink-...
Flink示例源码
赠送jar包:flink-java-1.13.2.jar; 赠送原API文档:flink-java-1.13.2-javadoc.jar; 赠送源代码:flink-java-1.13.2-sources.jar; 赠送Maven依赖信息文件:flink-java-1.13.2.pom; 包含翻译后的API文档:flink-...
基于Flink+FlinkCDC+FlinkSQL+Clickhouse构建实时数据仓库课程,2022年新课,flink1.14版本
flink 安装包 1.15.2(flink-1.15.2-bin-scala_2.12 .tgz)
flink-sql集成rabbitmq
flink-doris-connector-1.14_2.12-1.1.1.jar、flink-sql-connector-mysql-cdc-2.1.1.jar、dlink-connector-jdbc-1.14-0.7.2.jar、mysql-connector-java-5.1.27-bin.jar、flink-faker-0.5.0.jar、flink-connector-...
flink 1.14.4 源码下载
<flink.version>1.13.5</flink.version> <libthrift.version>0.13.0 <arrow.version>5.0.0 <maven-compiler-plugin.version>3.8.1 <maven-javadoc-plugin.version>3.3.0 <maven-source-plugin.version>3.2.1...