Flink入门:读取Kafka实时数据流,实现WordCount
lipiwang 2024-11-22 17:21 4 浏览 0 评论
本文主要介绍Flink接收一个Kafka文本数据流,进行WordCount词频统计,然后输出到标准输出上。通过本文你可以了解如何编写和运行Flink程序。
代码拆解
首先要设置Flink的执行环境:
// 创建Flink执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
设置Kafka相关参数,连接对应的服务器和端口号,读取名为Shakespeare的Topic中的数据源,将数据源命名为stream:
// Kafka参数
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "flink-group");
String inputTopic = "Shakespeare";
String outputTopic = "WordCount";
// Source
FlinkKafkaConsumer<String> consumer =
new FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(consumer);
使用Flink算子处理这个数据流:
// Transformations
// 使用Flink算子对输入流的文本进行操作
// 按空格切词、计数、分区、设置时间窗口、聚合
DataStream<Tuple2<String, Integer>> wordCount = stream
.flatMap((String line, Collector<Tuple2<String, Integer>> collector) -> {
String[] tokens = line.split("\\s");
// 输出结果 (word, 1)
for (String token : tokens) {
if (token.length() > 0) {
collector.collect(new Tuple2<>(token, 1));
}
}
})
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
这里使用的是Flink提供的DataStream级别的API,主要包括转换、分组、窗口和聚合等操作。
将数据流打印:
// Sink
wordCount.print();
最后执行这个程序:
// execute
env.execute("kafka streaming word count");
env.execute 是启动Flink作业所必需的,只有在execute()被调用时,之前调用的各个操作才会在提交到集群上或本地计算机上执行。
完整代码如下:
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.util.Properties;
public class WordCountKafkaInStdOut {
public static void main(String[] args) throws Exception {
// 创建Flink执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Kafka参数
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "flink-group");
String inputTopic = "Shakespeare";
String outputTopic = "WordCount";
// Source
FlinkKafkaConsumer<String> consumer =
new FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(consumer);
// Transformations
// 使用Flink算子对输入流的文本进行操作
// 按空格切词、计数、分区、设置时间窗口、聚合
DataStream<Tuple2<String, Integer>> wordCount = stream
.flatMap((String line, Collector<Tuple2<String, Integer>> collector) -> {
String[] tokens = line.split("\\s");
// 输出结果 (word, 1)
for (String token : tokens) {
if (token.length() > 0) {
collector.collect(new Tuple2<>(token, 1));
}
}
})
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
// Sink
wordCount.print();
// execute
env.execute("kafka streaming word count");
}
}
执行程序
我们在Kafka能做什么?十分钟构建你的实时数据流管道这篇文章中曾提到如何启动一个Kafka集群,并向某个Topic内发送数据流。在本次Flink作业启动之前,我们先要按照那篇文章中提到的方式启动一个Kafka集群,创建对应的Topic,并向Topic中写入数据。
Intellij Idea调试执行
在IntelliJ Idea中,点击绿色按钮,执行这个程序。下图中任意两个绿色按钮都可以启动程序。
IntelliJ Idea下方会显示程序中输出到标准输出上的内容,包括本次需要打印的结果。
恭喜你,你的第一个Flink程序运行成功!
在集群上提交作业
第一步中我们已经下载并搭建了本地集群,接着我们在模板的基础上添加了代码,并可以在IntelliJ Idea中调试运行。在生产环境,一般需要将代码编译打包,提交到集群上。
注意,这里涉及两个目录,一个是我们存放我们刚刚编写代码的工程目录,简称工程目录,另一个是从Flink官网下载解压的Flink主目录,主目录下的bin目录中有Flink提供好的命令行工具。
进入工程目录,使用Maven命令行编译打包:
# 使用Maven将自己的代码编译打包
# 打好的包一般放在工程目录的target子文件夹下
$ mvn clean package
回到刚刚下载解压的Flink主目录,使用Flink提供的命令行工具flink,将我们刚刚打包好的作业提交到集群上。命令行的参数--class用来指定哪个主类作为入口。我们之后会介绍命令行的具体使用方法。
$ bin/flink run --class com.flink.tutorials.java.api.projects.wordcount.WordCountKafkaInStdOut /Users/luweizheng/Projects/big-data/flink-tutorials/target/flink-tutorials-0.1.jar
这时,仪表盘上就多了一个Flink程序。
程序的输出会打到Flink主目录下面的log目录下的.out文件中,使用下面的命令查看结果:
$ tail -f log/flink-*-taskexecutor-*.out
停止本地集群:
$ ./bin/stop-cluster.sh
Flink开发和调试过程中,一般有几种方式执行程序:
- 使用IntelliJ Idea内置的运行按钮。这种方式主要在本地调试时使用。
- 使用Flink提供的标准命令行工具向集群提交作业,包括Java和Scala程序。这种方式更适合生产环境。
- 使用Flink提供的其他命令行工具,比如针对Scala、Python和SQL的交互式环境。这种方式也是在调试时使用。
相关推荐
- ubuntu单机安装open-falcon极度详细操作
-
备注:以下操作均由本人实际操作并得到验证,喜欢的同学可尝试操作安装。步骤一1.1环境准备(使用系统:ubuntu18.04)1.1.1安装redisubuntu下安装(参考借鉴:https://...
- Linux搭建promtail、loki、grafana轻量日志监控系统
-
一:简介日志监控告警系统,较为主流的是ELK(Elasticsearch、Logstash和Kibana核心套件构成),虽然优点是功能丰富,允许复杂的操作。但是,这些方案往往规模复杂,资源占用高,...
- 一文搞懂,WAF阻止恶意攻击的8种方法
-
WAF(Web应用程序防火墙)是应用程序和互联网流量之间的第一道防线,它监视和过滤Internet流量以阻止不良流量和恶意请求,WAF是确保Web服务的可用性和完整性的重要安全解决方案。它...
- 14配置appvolume(ios14.6配置文件)
-
使用AppVolumes应用程序功能,您可以管理应用程序的整个生命周期,包括打包、更新和停用应用程序。您还可以自定义应用程序分配,以向最终用户提供应用程序的特定版本14.1安装appvolume...
- 目前流行的缺陷管理工具(缺陷管理方式存在的优缺点)
-
摘自:https://blog.csdn.net/jasonteststudy/article/details/7090127?utm_medium=distribute.pc_relevant.no...
- 开源数字货币交易所开发学习笔记(2)——SpringCloud
-
前言码云(Gitee)上开源数字货币交易所源码CoinExchange的整体架构用了SpringCloud,对于经验丰富的Java程序员来说,可能很简单,但是对于我这种入门级程序员,还是有学习的必要的...
- 开发JAX-RPC Web Services for WebSphere(下)
-
在开发JAX-RPCWebServicesforWebSphere(上)一文中,小编为大家介绍了如何创建一个Web服务项目、如何创建一个服务类和Web服务,以及部署项目等内容。接下来小编将为大...
- CXF学习笔记1(cxf client)
-
webservice是发布服务的简单并实用的一种技术了,个人学习了CXF这个框架,也比较简单,发布了一些笔记,希望对笔友收藏并有些作用哦1.什么是webServicewebService让一个程序可...
- 分布式RPC最全详解(图文全面总结)
-
分布式通信RPC是非常重要的分布式系统组件,大厂经常考察的Dubbo等RPC框架,下面我就全面来详解分布式通信RPC@mikechen本篇已收于mikechen原创超30万字《阿里架构师进阶专题合集》...
- Oracle WebLogic远程命令执行0day漏洞(CVE-2019-2725补丁绕过)预警
-
概述近日,奇安信天眼与安服团队通过数据监控发现,野外出现OracleWebLogic远程命令执行漏洞最新利用代码,此攻击利用绕过了厂商今年4月底所发布的最新安全补丁(CVE-2019-2725)。由...
- Spring IoC Container 原理解析(spring中ioc三种实现原理)
-
IoC、DI基础概念关于IoC和DI大家都不陌生,我们直接上martinfowler的原文,里面已经有DI的例子和spring的使用示例《InversionofControlContainer...
- Arthas线上服务器问题排查(arthas部署)
-
1Arthas(阿尔萨斯)能为你做什么?这个类从哪个jar包加载的?为什么会报各种类相关的Exception?我改的代码为什么没有执行到?难道是我没commit?分支搞错了?遇到问题无法在...
- 工具篇之IDEA功能插件HTTP_CLENT(idea2021插件)
-
工具描述:Java开发人员通用的开发者工具IDEA集成了HTTPClient功能,之后可以无需单独安装使用PostMan用来模拟http请求。创建方式:1)简易模式Tools->HTTPCl...
- RPC、Web Service等几种远程监控通信方式对比
-
几种远程监控通信方式的介绍一.RPCRPC使用C/S方式,采用http协议,发送请求到服务器,等待服务器返回结果。这个请求包括一个参数集和一个文本集,通常形成“classname.meth...
- 《github精选系列》——SpringBoot 全家桶
-
1简单总结1SpringBoot全家桶简介2项目简介3子项目列表4环境5运行6后续计划7问题反馈gitee地址:https://gitee.com/yidao620/springbo...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- maven镜像 (69)
- undefined reference to (60)
- zip格式 (63)
- oracle over (62)
- date_format函数用法 (67)
- 在线代理服务器 (60)
- shell 字符串比较 (74)
- x509证书 (61)
- localhost (65)
- java.awt.headless (66)
- syn_sent (64)
- settings.xml (59)
- 弹出窗口 (56)
- applicationcontextaware (72)
- my.cnf (73)
- httpsession (62)
- pkcs7 (62)
- session cookie (63)
- java 生成uuid (58)
- could not initialize class (58)
- beanpropertyrowmapper (58)
- word空格下划线不显示 (73)
- jar文件 (60)
- jsp内置对象 (58)
- makefile编写规则 (58)