干货|Flink入门教程(二)
lipiwang 2024-11-22 17:20 5 浏览 0 评论
2 基本概念
2.1 DataStream和DataSet
Flink使用DataStream、DataSet在程序中表示数据,我们可以将它们视为可以包含重复项的不可变数
据集合。
DataSet是有限数据集(比如某个数据文件),而DataStream的数据可以是无限的(比如kafka队列中
的消息)。
这些集合在某些关键方面与常规Java集合不同。首先,它们是不可变的,这意味着一旦创建它们就无法
添加或删除元素。你也不能简单地检查里面的元素。
数据流通过fifilter/map等各种方法,执行过滤、转换、合并、拆分等操作,达到数据计算的目的。
2.2 数据类型
Flink对DataSet或DataStream中可以包含的元素类型设置了一些限制,以便于更有效的执行策略。
有六种不同类别的数据类型:
1. Java元组和Scala案例类
2. Java POJO
3. 原始类型
4. 常规类
5. 值
6. Hadoop Writables
2.2.1 元组
元组是包含固定数量的具有各种类型的字段的复合类型。Java API提供了 Tuple1 到 Tuple25 。元组的
每个字段都可以是包含更多元组的任意Flink类型,从而产生嵌套元组。可以使用字段名称直接访问元组
的字段 tuple.f4 ,或使用通用getter方法 tuple.getField(int position) 。字段索引从0开始。请
注意,这与Scala元组形成对比,但它与Java的一般索引更为一致。
DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
new Tuple2<String, Integer>("hello", 1),
new Tuple2<String, Integer>("world", 2));
wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
@Override
public Integer map(Tuple2<String, Integer> value) throws Exception {
return value.f1;
}
});
wordCounts.keyBy(0); // also valid .keyBy("f0")
2.2.2 POJOs
如果满足以下要求,则Flink将Java和Scala类视为特殊的POJO数据类型:
- 类必须公开类
- 它必须有一个没有参数的公共构造函数(默认构造函数)。
- 所有字段都是公共的,或者必须通过getter和setter函数访问。对于一个名为 foo 的属性的getter
和setter方法的字段必须命名 getFoo() 和 setFoo() 。
- 注册的序列化程序必须支持字段的类型。
序列化:
POJO通常使用PojoTypeInfo和PojoSerializer(使用Kryo作为可配置的回退)序列化。例外情况是
POJO实际上是Avro类型(Avro特定记录)或生成为“Avro反射类型”。在这种情况下,POJO使用
AvroTypeInfo和AvroSerializer序列化。如果需要,还可以注册自己的自定义序列化程序
public class WordWithCount {
public String word;
public int count;
public WordWithCount() {}
public WordWithCount(String word, int count) {
this.word = word;
this.count = count;
}
}
DataStream<WordWithCount> wordCounts = env.fromElements(
new WordWithCount("hello", 1),
new WordWithCount("world", 2));
wordCounts.keyBy("word"); // key by field expression "word"
2.2.3 基础数据类型
Flink支持所有Java和Scala的原始类型,如 Integer , String 和 Double 。
2.2.4 常规类
Flink支持大多数Java和Scala类(API和自定义)。限制适用于包含无法序列化的字段的类,如文件指
针,I / O流或其他本机资源。遵循Java Beans约定的类通常可以很好地工作。
所有未标识为POJO类型的类都由Flink作为常规类类型处理。Flink将这些数据类型视为黑盒子,并且无
法访问其内容(例如,用于有效排序)。使用序列化框架Kryo对常规类型进行反序列化。
2.2.5 值
值类型手动描述其序列化和反序列化。它们不是通过通用序列化框架,而是通过
org.apache.flinktypes.Value 使用方法 read 和实现接口为这些操作提供自定义代码 write 。当通
用序列化效率非常低时,使用值类型是合理的。一个示例是将元素的稀疏向量实现为数组的数据类型。
知道数组大部分为零,可以对非零元素使用特殊编码,而通用序列化只需编写所有数组元素。
该 org.apache.flinktypes.CopyableValue 接口以类似的方式支持手动内部克隆逻辑。
Flink带有与基本数据类型对应的预定义值类型。( ByteValue , ShortValue , IntValue ,
LongValue , FloatValue , DoubleValue , StringValue , CharValue , BooleanValue )。这
些值类型充当基本数据类型的可变变体:它们的值可以被更改,允许程序员重用对象并从垃圾收集器中
消除压力。
2.2.6. Hadoop Writables
使用实现 org.apache.hadoop.Writable 接口的类型。
write() 和 readFields() 方法中定义的序列化逻辑将用于序列化。
2.3 数据的操作
数据转换,即通过从一个或多个 DataStream 生成新的DataStream 的过程,是主要的数据处理的手
段。Flink 提供了多种数据转换操作,基本可以满足所有的日常使用场景。
2.4 窗口的含义
Flink计算引擎中,时间是一个非常重要的概念,Flink的时间分为三种时间:
- EventTime: 事件发生的时间
- IngestionTime:事件进入 Flink 的时间
- ProcessingTime:事件被处理时的时间
窗口是Flink流计算的一个核心概念,Flink窗口主要包括:
- 时间窗口
- 翻滚时间窗口
- 滑动时间窗口
- 数量窗口
- 翻滚数量窗口
- 滑动数量窗口
按照形式来划分,窗口又分为:
- 翻滚窗口
- 滑动窗口
2.5 有状态的流式处理
在很多场景下,数据都是以持续不断的流事件创建。例如网站的交互、或手机传输的信息、服务器日
志、传感器信息等。有状态的流处理(stateful stream processing)是一种应用设计模式,用于处理无
边界的流事件。
对于任何处理流事件的应用来说,并不会仅仅简单的一次处理一个记录就完事了。在对数据进行处理或
转换时,操作应该是有状态的。也就是说,需要有能力做到对处理记录过程中生成的中间数据进行存储
及访问。当一个应用收到一个 事件,在对其做处理时,它可以从状态信息(state)中读取数据进行协
助处理。或是将数据写入state。在这种准则下,状态信息(state)可以被存储(及访问)在很多不同
的地方,例如程序变量,本地文件,或是内置的(或外部的)数据库中。
Apache Flink 存储应用状态信息在本地内存或是一个外部数据库中。因为Flink 是一个分布式系统,本
地状态信息需要被有效的保护,以防止在应用或是硬件挂掉之后,造成数据丢失。Flink对此采取的机制
是:定期为应用状态(application state)生成一个一致(consistent)的checkpoint,并写入到一个
远端持久性的存储中。下面是一个有状态的流处理Flink application的示例图:
Stateful stream processing 应用的输入一般为:事件日志(event log)的持续事件。Event log 存储
并且分发事件流。事件被写入一个持久性的,仅可追加的(append-only)日志中。也就是说,被写入
的事件的顺序始终是不变的。所以事件在发布给多个不同用户时,均是以完全一样的顺序发布的。在开
源的event log 系统中,最著名的当属 Kafka。
使用flflink流处理程序连接event log的理由有多种。在这个架构下,event log 持久化输入的 events,并
且可以以既定的顺序重现这些事件。万一应用发生了某个错误,Flink会通过前一个checkpoint 恢复应
用的状态,并重置在event log 中的读取位置,并据此对events做重现,直到它抵达stream 的末端。这
个技术不仅被用于错误恢复,并且也可以用于更新应用,修复bugs,以及修复之前遗漏结果等场景中。
相关推荐
- ubuntu单机安装open-falcon极度详细操作
-
备注:以下操作均由本人实际操作并得到验证,喜欢的同学可尝试操作安装。步骤一1.1环境准备(使用系统:ubuntu18.04)1.1.1安装redisubuntu下安装(参考借鉴:https://...
- Linux搭建promtail、loki、grafana轻量日志监控系统
-
一:简介日志监控告警系统,较为主流的是ELK(Elasticsearch、Logstash和Kibana核心套件构成),虽然优点是功能丰富,允许复杂的操作。但是,这些方案往往规模复杂,资源占用高,...
- 一文搞懂,WAF阻止恶意攻击的8种方法
-
WAF(Web应用程序防火墙)是应用程序和互联网流量之间的第一道防线,它监视和过滤Internet流量以阻止不良流量和恶意请求,WAF是确保Web服务的可用性和完整性的重要安全解决方案。它...
- 14配置appvolume(ios14.6配置文件)
-
使用AppVolumes应用程序功能,您可以管理应用程序的整个生命周期,包括打包、更新和停用应用程序。您还可以自定义应用程序分配,以向最终用户提供应用程序的特定版本14.1安装appvolume...
- 目前流行的缺陷管理工具(缺陷管理方式存在的优缺点)
-
摘自:https://blog.csdn.net/jasonteststudy/article/details/7090127?utm_medium=distribute.pc_relevant.no...
- 开源数字货币交易所开发学习笔记(2)——SpringCloud
-
前言码云(Gitee)上开源数字货币交易所源码CoinExchange的整体架构用了SpringCloud,对于经验丰富的Java程序员来说,可能很简单,但是对于我这种入门级程序员,还是有学习的必要的...
- 开发JAX-RPC Web Services for WebSphere(下)
-
在开发JAX-RPCWebServicesforWebSphere(上)一文中,小编为大家介绍了如何创建一个Web服务项目、如何创建一个服务类和Web服务,以及部署项目等内容。接下来小编将为大...
- CXF学习笔记1(cxf client)
-
webservice是发布服务的简单并实用的一种技术了,个人学习了CXF这个框架,也比较简单,发布了一些笔记,希望对笔友收藏并有些作用哦1.什么是webServicewebService让一个程序可...
- 分布式RPC最全详解(图文全面总结)
-
分布式通信RPC是非常重要的分布式系统组件,大厂经常考察的Dubbo等RPC框架,下面我就全面来详解分布式通信RPC@mikechen本篇已收于mikechen原创超30万字《阿里架构师进阶专题合集》...
- Oracle WebLogic远程命令执行0day漏洞(CVE-2019-2725补丁绕过)预警
-
概述近日,奇安信天眼与安服团队通过数据监控发现,野外出现OracleWebLogic远程命令执行漏洞最新利用代码,此攻击利用绕过了厂商今年4月底所发布的最新安全补丁(CVE-2019-2725)。由...
- Spring IoC Container 原理解析(spring中ioc三种实现原理)
-
IoC、DI基础概念关于IoC和DI大家都不陌生,我们直接上martinfowler的原文,里面已经有DI的例子和spring的使用示例《InversionofControlContainer...
- Arthas线上服务器问题排查(arthas部署)
-
1Arthas(阿尔萨斯)能为你做什么?这个类从哪个jar包加载的?为什么会报各种类相关的Exception?我改的代码为什么没有执行到?难道是我没commit?分支搞错了?遇到问题无法在...
- 工具篇之IDEA功能插件HTTP_CLENT(idea2021插件)
-
工具描述:Java开发人员通用的开发者工具IDEA集成了HTTPClient功能,之后可以无需单独安装使用PostMan用来模拟http请求。创建方式:1)简易模式Tools->HTTPCl...
- RPC、Web Service等几种远程监控通信方式对比
-
几种远程监控通信方式的介绍一.RPCRPC使用C/S方式,采用http协议,发送请求到服务器,等待服务器返回结果。这个请求包括一个参数集和一个文本集,通常形成“classname.meth...
- 《github精选系列》——SpringBoot 全家桶
-
1简单总结1SpringBoot全家桶简介2项目简介3子项目列表4环境5运行6后续计划7问题反馈gitee地址:https://gitee.com/yidao620/springbo...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- maven镜像 (69)
- undefined reference to (60)
- zip格式 (63)
- oracle over (62)
- date_format函数用法 (67)
- 在线代理服务器 (60)
- shell 字符串比较 (74)
- x509证书 (61)
- localhost (65)
- java.awt.headless (66)
- syn_sent (64)
- settings.xml (59)
- 弹出窗口 (56)
- applicationcontextaware (72)
- my.cnf (73)
- httpsession (62)
- pkcs7 (62)
- session cookie (63)
- java 生成uuid (58)
- could not initialize class (58)
- beanpropertyrowmapper (58)
- word空格下划线不显示 (73)
- jar文件 (60)
- jsp内置对象 (58)
- makefile编写规则 (58)