Flink四大基石之CheckPoint
1、State Vs Checkpoint
State:状态,是Flink中某一个Operator在某一个时刻的状态,如maxBy/sum,注意State存的是历史数据/状态,存在内存中。
Checkpoint:快照点, 是Flink中所有有状态的Operator在某一个时刻的State快照信息/存档信息。
一句话概括: Checkpoint就是State的快照。
2、设置Checkpoint
package com.bigdata.day06;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class _01CheckPointDemo {
public static void main(String[] args) throws Exception {
//1. env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 在windows运行,将数据提交hdfs,会出现权限问题,使用这个语句解决。
System.setProperty("HADOOP_USER_NAME", "root");
// 在这个基础之上,添加快照
// 第一句:开启快照,每隔1s保存一次快照
env.enableCheckpointing(1000);
// 第二句:设置快照保存的位置
env.setStateBackend(new FsStateBackend("hdfs://bigdata01:9820/flink/checkpoint"));
// 第三句: 通过webui的cancel按钮,取消flink的job时,不删除HDFS的checkpoint目录
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//2. source-加载数据
DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
String[] arr = s.split(",");
return Tuple2.of(arr[0], Integer.valueOf(arr[1]));
}
});
//3. transformation-数据处理转换
SingleOutputStreamOperator<Tuple2<String, Integer>> result = mapStream.keyBy(0).sum(1);
result.print();
//4. sink-数据输出
//5. execute-执行
env.execute();
}
}
测试代码效果:启动本地的nc, 启动hdfs服务。
启动代码,发现有权限问题:
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=admin, access=WRITE, inode="/flink":root:supergroup:drwxr-xr-x
解决方案:
System.setProperty("HADOOP_USER_NAME", "root");
在设置检查点之前,设置一句这样带权限的语句,如果是集群运行,不存在该问题。可以不设置!!!
查看快照情况:
运行,刷新查看checkpoint保存的数据,它会先生成一个新的文件夹,然后再删除老的文件夹,在某一时刻,会出现两个文件夹同时存在的情况。
启动HDFS、Flink
[root@hadoop10 app]#start-dfs.sh
[root@hadoop10 app]#start-cluster.sh
数据是保存了,但是并没有起作用,想起作用需要在集群上运行,以下演示集群上的效果:
第一次运行的时候
在本地先clean, 再package ,再Wagon一下:
flink run -c 全类名 /opt/app/flink-test-1.0-SNAPSHOT.jar
flink run -c com.bigdata.day06._01CheckPointDemo /opt/app/FlinkDemo-1.0-SNAPSHOT.jar
记得,先启动nc ,再启动任务,否则报错!
通过nc -lk 9999 输入以下内容:
想查看运行结果,可以通过使用的slot数量判断一下:
取消flink job的运行
查看一下这次的单词统计到哪个数字了:
第二次运行的时候
flink run -c 全类名 -s hdfs://hadoop10:8020/flink-checkpoint/293395ef7e496bda2eddd153a18d5212/chk-34 /opt/app/flink-test-1.0-SNAPSHOT.jar
启动:
flink run -c com.bigdata.day06._01CheckPointDemo -s hdfs://bigdata01:9820/flink/checkpoint/bf416df7225b264fc34f8ff7e3746efe/chk-603 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar
-s 指定从checkpoint目录恢复状态数据 ,注意每个人都不一样
从上一次离开时,截止的checkpoint目录
观察数据:输入一个hello,1 得到新的结果hello,8
3、重启策略
重启策略的意义:流式数据是不可能停止的,假如有一条错误数据导致程序直接退出,后面的大量数据是会丢失的,对公司来讲,意义是重大的,损失是惨重的。
重启策略是一个单独的策略,如果你配置了 checkpoint 含有重启策略的,如果你没有 checkpoint 也可以自行配置重启策略,总之重启策略和 checkpoint 没有必然联系。
就是一个流在运行过程中,假如出现了程序异常问题,可以进行重启,比如,在代码中人为添加一些异常:
进行wordcount时,输入了一个bug,1 人为触发异常。
注意:此时如果有checkpoint ,是不会出现异常的,需要将checkpoint的代码关闭,再重启程序。会发现打印了异常,那为什么checkpoint的时候不打印,因为并没有log4j的配置文件,需要搞一个这样的配置文件才行。
程序中添加log4J的代码:
# Global logging configuration
# Debug info warn error
log4j.rootLogger=debug, stdout
# MyBatis logging configuration...
log4j.logger.org.mybatis.example.BlogMapper=TRACE
# Console output...
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n
那为什么开启检查点之后,报错了程序还在运行?因为开启检查点之后,程序会进行自动重启(无限重启【程序错了才重启】)。
//开启checkpoint,默认是无限重启,可以设置为不重启
//env.setRestartStrategy(RestartStrategies.noRestart());
//重启3次,重启时间间隔是10s
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));
//2分钟内重启3次,重启时间间隔是5s
env.setRestartStrategy(
RestartStrategies.failureRateRestart(3,
Time.of(2,TimeUnit.MINUTES),
Time.of(5,TimeUnit.SECONDS))
);
env.execute("checkpoint自动重启"); //最后一句execute可以设置jobName,显示在8081界面
程序如果上传至服务器端运行,可以看到重启状态
完整代码如下:
package com.bigdata.day06;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.concurrent.TimeUnit;
public class Demo02 {
public static void main(String[] args) throws Exception {
//1. env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 代码中不能有checkpoint,不是说checkpoint不好,而是太好了,它已经自带重试机制了。而且是无限重启的
// 通过如下方式可将重试机制关掉
// env.setRestartStrategy(RestartStrategies.noRestart());
//
// 两种办法
// 第一种办法:重试3次,每一次间隔10S
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));
// 第二种写法:在2分钟内,重启3次,每次间隔10s
env.setRestartStrategy(
RestartStrategies.failureRateRestart(3,
Time.of(2,TimeUnit.MINUTES),
Time.of(5,TimeUnit.SECONDS))
);
//2. source-加载数据
DataStreamSource<String> streamSource = env.socketTextStream("bigdata01", 8899);
streamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
String[] arr = value.split(",");
String word = arr[0];
if(word.equals("bug")){
throw new Exception("有异常,服务会挂掉.....");
}
// 将一个字符串变为int类型
int num = Integer.valueOf(arr[1]);
// 第二种将字符串变为数字的方法
System.out.println(Integer.parseInt(arr[1]));
Tuple2<String, Integer> tuple2 = new Tuple2<>(word,num);
// 还有什么方法? 第二种创建tuple的方法
Tuple2<String, Integer> tuple2_2 = Tuple2.of(word,num);
return tuple2;
}
}).keyBy(tuple->tuple.f0).sum(1).print();
//3. transformation-数据处理转换
//4. sink-数据输出
//5. execute-执行
env.execute();
}
}
在本地测试,是没有办法看到重试机制的现象的,需要打包上传至集群,特别注意:使用的类名到底是哪一个。
4、savePoint
checkpoint自动完成state快照、savePoint是手动的完成快照。
如果程序在没有设置checkpoint的情况,可以通过savePoint设置state快照
1.提交一个flink job --提交的还是重启策略的代码打成的jar包
flink run -c 全类名 /opt/app/flink-test-1.0-SNAPSHOT.jar
2.输入一些数据,观察单词对应的数字的变化
3.执行savepoint操作
以下是 --> 停止flink job,并且触发savepoint操作
flink stop --savepointPath hdfs://bigdata01:9820/flink-savepoint 152e493da9cdeb327f6cbbad5a7f8e41
后面的序号为Job 的ID
以下是 --> 不会停止flink的job,只是完成savepoint操作
flink savepoint 79f53c5c0bb3563b6b6ed3011176c411 hdfs://bigdata01:9820/flink-savepoint
备注:如何正确停止一个 flink 的任务
flink stop 6a27b580aa5c6b57766ae6241d9270ce(任务编号)
4.查看最近完成的flink job对应的savepoint
5.根据之前的savepoint路径,重新启动flink job
flink run -c 全类名 -s hdfs://hadoop10:8020/flink-savepoint/savepoint-79f53c-64b5d94771eb /opt/app/flink-test-1.0-SNAPSHOT.jar
Job has been submitted with JobID 3766ec9ff6f34b46376493a04b50a1f4
再次输入单词,可以看到在之前的基础上累加
另外,在集群中运行我们的程序,默认并行度为1,它不会按照机器的CPU核数,而是按照配置文件中的一个默认值运行的。比如:flink-conf.yaml
web-ui 界面提交作业:
这个图形化界面,跟我们使用如下命令是一个效果:
flink run -c com.bigdata.day06._01CheckPointDemo -s hdfs://bigdata01:9820/flink/checkpoint/bf416df7225b264fc34f8ff7e3746efe/chk-603 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar