百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术资源 > 正文

Flink核心技术之一:状态管理与checkPoint数据容错机制

lipiwang 2024-11-22 17:21 4 浏览 0 评论

Flink 状态管理与checkPoint数据容错机制深入剖析

1 何为状态

  • 在批处理过程中,数据是划分为块分片去完成的,然后每一个Task去处理一个分片。当分片执行完成后,把输出聚合起来就是最终的结果。在这个过程当中,对于state的需求还是比较小的。
  • 在流计算过程中,对State有非常高的要求,因为在流系统中输入是一个无限制的流,会持续运行从不间断。在这个过程当中,就需要将状态数据很好的管理起来。
  • Flink的失败恢复依赖于“检查点机制+可部分重发的数据源”。
  • 检查点机制:检查点定期触发,产生快照,快照中记录了(1)当前检查点开始时数据源(例如Kafka)中消息的offset,(2)记录了所有有状态的operator当前的状态信息(例如sum中的数值)。
  • 可部分重发的数据源:Flink选择最近完成的检查点K。然后系统重放整个分布式的数据流,然后给予每个operator他们在检查点k快照中的状态。数据源被设置为从位置Sk开始重新读取流。例如在Apache Kafka中,那意味着告诉消费者从偏移量Sk开始重新消费。
  • Flink中有两种基本类型的State,即Keyed State和Operator State。
  • State可以被记录,在失败的情况下数据还可以恢复

一句话的事儿:state一般指一个具体的task/operator的状态【state数据默认保存在java的堆内存中】

2 检查点Checkpoint 与Barrier

一句话的事儿: checkpoint【可以理解为checkpoint是把state数据持久化存储了】,则表示了一个Flink Job在一个特定时刻的一份全局状态快照,即包含了所有task/operator的状态

为了保证state的容错性,Flink需要对state进行checkpoint。

Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator/task的状态来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常

Flink的checkpoint机制可以与(stream和state)的持久化存储交互的前提是: 持久化的source(如kafka),它需要支持在一定时间内重放事件。这种sources的典型例子是持久化的消息队列(比如Apache Kafka,RabbitMQ等)或文件系统(比如HDFS,S3,GFS等) 用于state的持久化存储,例如分布式文件系统(比如HDFS,S3,GFS等)

Flink的检查点机制实现了标准的Chandy-Lamport算法,并用来实现分布式快照。在分布式快照当中,有一个核心的元素:Barrier。

  • 单流的barrier:
  • 1: 屏障作为数据流的一部分随着记录被注入到数据流中。屏障永远不会赶超通常的流记录,它会严格遵循顺序。
  • 2: 屏障将数据流中的记录隔离成一系列的记录集合,并将一些集合中的数据加入到当前的快照中,而另一些数据加入到下一个快照中。
  • 3: 每一个屏障携带着快照的ID,快照记录着ID并且将其放在快照数据的前面。
  • 4: 屏障不会中断流处理,因此非常轻量级。
  • 并行barrier
  • 1:不止一个输入流的时的operator,需要在快照屏障上对齐(align)输入流,才会发射出去。 2:可以看到1,2,3会一直放在Input buffer,直到另一个输入流的快照到达Operator。

3 有状态的Operator工作一览图

Stateful Flink applications are optimized for local state access. Task state 
is always maintained in memory or, if the state size exceeds the available memory,
in access-efficient on-disk data structures. Hence, tasks perform all computations 
by accessing local, often in-memory, state yielding very low processing latencies.
Flink guarantees exactly-once state consistency in case of failures by periodically 
and asynchronously checkpointing the local state to durable storage.
复制代码

4 状态管理

4.1 原始状态与托管状态

Keyed State和Operator State,可以以两种形式存在:

  • 原始状态(raw state)
  • 托管状态(managed state)
  • 托管状态是由Flink框架管理的状态
  • 原始状态,由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知。
  • 通常在DataStream上的状态推荐使用托管的状态。
  • 当实现一个用户自定义的operator时,会使用到原始状态

4.2 State-Keyed State 是什么?直接上干货。(兄弟 State-Operator State 与key无关)

  • 顾名思义,就是基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,都对应一个state。 stream.keyBy(…)
  • state的数据结构;
  • (1) ValueState:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值
  • (2) ListState:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值
  • (3) ReducingState:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值
  • (4) MapState<UK, UV>:即状态值为一个map。用户通过put或putAll方法添加元素
  • 需要注意的是,以上所述的State对象,仅仅用于与状态进行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储系统中。相当于我们只是持有了这个状态的句柄。实际上:这些状态有三种存储方式:
 MemoryStateBackend:
 FsStateBackend
 RockDBStateBackend
复制代码

4.3 State-Keyed State 存储方式?直接上干货

  • MemoryStateBackend
  • state数据保存在java堆内存中,执行checkpoint的时候,会把state的快照数据保存到jobmanager的内存中 基于内存的state backend在生产环境下不建议使用。
  • FsStateBackend
  • state数据保存在taskmanager的内存中,执行checkpoint的时候,会把state的快照数据保存到配置的文件系统中,可以使用hdfs等分布式文件系统。
  • RocksDBStateBackend
  • RocksDB跟上面的都略有不同,它会在本地文件系统中维护状态,state会直接写入本地rocksdb中。同时RocksDB需要配置一个远端的filesystem。
  • uri(一般是HDFS),在做checkpoint的时候,会把本地的数据直接复制到filesystem中。fail over的时候从filesystem中恢复到本地。
  • RocksDB克服了state受内存限制的缺点,同时又能够持久化到远端文件系统中,比较适合在生产中使用

4.4 State 生成快照

4.5 State 快照恢复

5 与Key相关的状态管理案例实战(以Key分组进行状态管理)

5.1 RichFlatMapFunction 核心代码奉上

package xuwei.tech.streaming;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
/**
 * qinkaixin 2018 11 24 
 */
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
 /**
 * The ValueState handle. The first field is the count, the second field a running sum.
 */
 private transient ValueState<Tuple2<Long, Long>> sum;
 @Override
 public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
 // access the state value
 Tuple2<Long, Long> currentSum = sum.value();
 // update the count
 currentSum.f0 += 1;
 // add the second field of the input value
 currentSum.f1 += input.f1;
 // update the state
 sum.update(currentSum);
 // if the count reaches 2, emit the average and clear the state
 if (currentSum.f0 >= 2) {
 out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
 sum.clear();
 }
 }
 @Override
 public void open(Configuration config) {
 ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
 new ValueStateDescriptor<>(
 "average", 
 TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), 
 Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
 sum = getRuntimeContext().getState(descriptor);
 }
}
复制代码

5.2 RichFlatMapFunction 执行操作

public static void main(String[] args) throws Exception{
 //获取Flink的运行环境
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
 .keyBy(0)
 .flatMap(new CountWindowAverage())
 .print();
 env.execute("StafulOperator");
 System.out.println("***********");
}
复制代码

5.3 最终结果为什么是这样的?

  • if the count reaches 2, emit the average and clear the state
  • 所以Tuple2.of(1L, 3L), Tuple2.of(1L, 5L) 一组
  • 所以Tuple2.of(1L, 7L),Tuple2.of(1L, 4L)一组

6 与Operator相关的State案例实战

  • 与Key无关的State,与Operator绑定的state,整个operator只对应一个state
  • 保存Operator state的数据结构为ListState
  • 举例来说,Flink中的Kafka Connector,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射
  • 继承CheckpointedFunction,实现snapshotState和restoreState。
 To use managed operator state, a stateful function can implement either 
 the more general CheckpointedFunction interface, or the 
 ListCheckpointed<T extends Serializable> interface.
 Whenever a checkpoint has to be performed, snapshotState() is called. The 
 counterpart,initializeState(), is called every time the user-defined function 
 is initialized, be that when the function is first initialized or be that when the function is actuallyrecovering from an earlier checkpoint. Given this,
 initializeState() is not only the place where different types of state are
 initialized, but also where state recovery
 logic is included.
复制代码

6.1 BufferingSink案例

 public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
 CheckpointedFunction {
 private final int threshold;
 private transient ListState<Tuple2<String, Integer>> checkpointedState;
 private List<Tuple2<String, Integer>> bufferedElements;
 public BufferingSink(int threshold) {
 this.threshold = threshold;
 this.bufferedElements = new ArrayList<>();
 }
 @Override
 public void invoke(Tuple2<String, Integer> value) throws Exception {
 bufferedElements.add(value);
 if (bufferedElements.size() == threshold) {
 for (Tuple2<String, Integer> element: bufferedElements) {
 // send it to the sink
 }
 bufferedElements.clear();
 }
 }
 @Override
 public void snapshotState(FunctionSnapshotContext context) throws Exception {
 checkpointedState.clear();
 for (Tuple2<String, Integer> element : bufferedElements) {
 checkpointedState.add(element);
 }
 }
 @Override
 public void initializeState(FunctionInitializationContext context) throws Exception {
 ListStateDescriptor<Tuple2<String, Integer>> descriptor =
 new ListStateDescriptor<>(
 "buffered-elements",
 TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
 checkpointedState = context.getOperatorStateStore().getListState(descriptor);
 if (context.isRestored()) {
 for (Tuple2<String, Integer> element : checkpointedState.get()) {
 bufferedElements.add(element);
 }
 }
 }
}
复制代码

6.2 Stateful Source案例

 public static class CounterSource extends RichParallelSourceFunction<Long>
 implements ListCheckpointed<Long> {
 /** current offset for exactly once semantics */
 private Long offset;
 /** flag for job cancellation */
 private volatile boolean isRunning = true;
 @Override
 public void run(SourceContext<Long> ctx) {
 final Object lock = ctx.getCheckpointLock();
 while (isRunning) {
 // output and state update are atomic
 synchronized (lock) {
 ctx.collect(offset);
 offset += 1;
 }
 }
 }
 @Override
 public void cancel() {
 isRunning = false;
 }
 @Override
 public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
 return Collections.singletonList(offset);
 }
 @Override
 public void restoreState(List<Long> state) {
 for (Long s : state)
 offset = s;
 }
}
复制代码

7 checkPoint的配置进一步升华

7.1 checkpoint 开关

  • 默认checkpoint功能是disabled的,想要使用的时候需要先启用
  • checkpoint开启之后,默认的checkPointMode是Exactly-once
  • checkpoint的checkPointMode有两种,Exactly-once和At-least-once
  • Exactly-once对于大多数应用来说是最合适的。At-least-once可能用在某些延迟超低的应用程序(始终延迟为几毫秒)

7.2 checkpoint 调优配置(Cancel处理很有意思)

  • 默认checkpoint功能是disabled的,想要使用的时候需要先启用
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 // 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
 env.enableCheckpointing(1000);
 // 高级选项:
 // 设置模式为exactly-once (这是默认值)
 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
 // 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
 // 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
 env.getCheckpointConfig().setCheckpointTimeout(60000);
 // 同一时间只允许进行一个检查点
 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
 // 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】
 env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 
 cancel处理选项:
 (1)ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:
 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定
 的Checkpoint
 
 (2)ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:
 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会
 保存checkpoint
复制代码

8 State Backend 状态的后端存储(一剑封喉)

8.1 配置说明

修改State Backend的两种方式

  • 第一种:单任务调整
 修改当前任务代码
 env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));
 或者new MemoryStateBackend()
 或者new RocksDBStateBackend( hdfs->url, true);【需要添加第三方依赖】
复制代码
  • 第二种:全局调整
 修改flink-conf.yaml
 state.backend: filesystem
 state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
 注意:state.backend的值可以是下面几种:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)
复制代码

8.2 精彩案例实战

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SocketWindowWordCountJavaCheckPoint {
 public static void main(String[] args) throws Exception{
 //获取需要的端口号
 int port;
 try {
 ParameterTool parameterTool = ParameterTool.fromArgs(args);
 port = parameterTool.getInt("port");
 }catch (Exception e){
 System.err.println("No port set. use default port 9000--java");
 port = 9010;
 }
 //获取flink的运行环境
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 // 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
 env.enableCheckpointing(1000);
 
 // 高级选项:
 // 设置模式为exactly-once (这是默认值)
 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
 
 // 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
 
 // 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
 env.getCheckpointConfig().setCheckpointTimeout(60000);
 
 // 同一时间只允许进行一个检查点
 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
 
 // 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】
 //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
 //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
 
 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 //设置statebackend
 //env.setStateBackend(new MemoryStateBackend());
 //env.setStateBackend(new FsStateBackend("hdfs://hadoop100:9000/flink/checkpoints"));
 //env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));
 String hostname = "SparkMaster";
 String delimiter = "\n";
 //连接socket获取输入的数据
 DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);
 // a a c
 // a 1
 // a 1
 // c 1
 DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
 public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
 String[] splits = value.split("\\s");
 for (String word : splits) {
 out.collect(new WordWithCount(word, 1L));
 }
 }
 }).keyBy("word")
 .timeWindow(Time.seconds(2), Time.seconds(1))//指定时间窗口大小为2秒,指定时间间隔为1秒
 .sum("count");//在这里使用sum或者reduce都可以
 /*.reduce(new ReduceFunction<WordWithCount>() {
 public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {
 return new WordWithCount(a.word,a.count+b.count);
 }
 })*/
 //把数据打印到控制台并且设置并行度
 windowCounts.print().setParallelism(1);
 //这一行代码一定要实现,否则程序不执行
 env.execute("Socket window count");
 }
 public static class WordWithCount{
 public String word;
 public long count;
 public WordWithCount(){}
 public WordWithCount(String word,long count){
 this.word = word;
 this.count = count;
 }
 @Override
 public String toString() {
 return "作者 : 秦凯新 , 窗大小2秒,滑动1秒 {" +
 " word='" + word + '\'' +
 ", count=" + count +
 '}';
 }
 }
}
复制代码

8.3 精彩案例结果

相关推荐

ubuntu单机安装open-falcon极度详细操作

备注:以下操作均由本人实际操作并得到验证,喜欢的同学可尝试操作安装。步骤一1.1环境准备(使用系统:ubuntu18.04)1.1.1安装redisubuntu下安装(参考借鉴:https://...

Linux搭建promtail、loki、grafana轻量日志监控系统

一:简介日志监控告警系统,较为主流的是ELK(Elasticsearch、Logstash和Kibana核心套件构成),虽然优点是功能丰富,允许复杂的操作。但是,这些方案往往规模复杂,资源占用高,...

一文搞懂,WAF阻止恶意攻击的8种方法

WAF(Web应用程序防火墙)是应用程序和互联网流量之间的第一道防线,它监视和过滤Internet流量以阻止不良流量和恶意请求,WAF是确保Web服务的可用性和完整性的重要安全解决方案。它...

14配置appvolume(ios14.6配置文件)

使用AppVolumes应用程序功能,您可以管理应用程序的整个生命周期,包括打包、更新和停用应用程序。您还可以自定义应用程序分配,以向最终用户提供应用程序的特定版本14.1安装appvolume...

目前流行的缺陷管理工具(缺陷管理方式存在的优缺点)

摘自:https://blog.csdn.net/jasonteststudy/article/details/7090127?utm_medium=distribute.pc_relevant.no...

开源数字货币交易所开发学习笔记(2)——SpringCloud

前言码云(Gitee)上开源数字货币交易所源码CoinExchange的整体架构用了SpringCloud,对于经验丰富的Java程序员来说,可能很简单,但是对于我这种入门级程序员,还是有学习的必要的...

开发JAX-RPC Web Services for WebSphere(下)

在开发JAX-RPCWebServicesforWebSphere(上)一文中,小编为大家介绍了如何创建一个Web服务项目、如何创建一个服务类和Web服务,以及部署项目等内容。接下来小编将为大...

CXF学习笔记1(cxf client)

webservice是发布服务的简单并实用的一种技术了,个人学习了CXF这个框架,也比较简单,发布了一些笔记,希望对笔友收藏并有些作用哦1.什么是webServicewebService让一个程序可...

分布式RPC最全详解(图文全面总结)

分布式通信RPC是非常重要的分布式系统组件,大厂经常考察的Dubbo等RPC框架,下面我就全面来详解分布式通信RPC@mikechen本篇已收于mikechen原创超30万字《阿里架构师进阶专题合集》...

Oracle WebLogic远程命令执行0day漏洞(CVE-2019-2725补丁绕过)预警

概述近日,奇安信天眼与安服团队通过数据监控发现,野外出现OracleWebLogic远程命令执行漏洞最新利用代码,此攻击利用绕过了厂商今年4月底所发布的最新安全补丁(CVE-2019-2725)。由...

Spring IoC Container 原理解析(spring中ioc三种实现原理)

IoC、DI基础概念关于IoC和DI大家都不陌生,我们直接上martinfowler的原文,里面已经有DI的例子和spring的使用示例《InversionofControlContainer...

Arthas线上服务器问题排查(arthas部署)

1Arthas(阿尔萨斯)能为你做什么?这个类从哪个jar包加载的?为什么会报各种类相关的Exception?我改的代码为什么没有执行到?难道是我没commit?分支搞错了?遇到问题无法在...

工具篇之IDEA功能插件HTTP_CLENT(idea2021插件)

工具描述:Java开发人员通用的开发者工具IDEA集成了HTTPClient功能,之后可以无需单独安装使用PostMan用来模拟http请求。创建方式:1)简易模式Tools->HTTPCl...

RPC、Web Service等几种远程监控通信方式对比

几种远程监控通信方式的介绍一.RPCRPC使用C/S方式,采用http协议,发送请求到服务器,等待服务器返回结果。这个请求包括一个参数集和一个文本集,通常形成“classname.meth...

《github精选系列》——SpringBoot 全家桶

1简单总结1SpringBoot全家桶简介2项目简介3子项目列表4环境5运行6后续计划7问题反馈gitee地址:https://gitee.com/yidao620/springbo...

取消回复欢迎 发表评论: