flink学习(13)—— 重试机制和维表join
重试机制
当任务出现异常的时候,会直接停止任务——解决方式,重试机制
1、设置checkpoint后,会给任务一个重启策略——无限重启
2、可以手动设置任务的重启策略
代码设置
//开启checkpoint后,默认是无限重启,可以设置该值 表示不重启
env.setRestartStrategy(RestartStrategies.noRestart());
//作业失败flink中最多重启3次,每次重启的最小间隔是10s
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));
//2分钟内最多重启3次,每次重启的最小间隔是5秒
env.setRestartStrategy(
RestartStrategies.failureRateRestart(3,
Time.of(2,TimeUnit.MINUTES),
Time.of(5,TimeUnit.SECONDS))
);
//无限重启
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
Integer.MAX_VALUE, // 无限重启次数
Time.of(10, TimeUnit.SECONDS) // 每次重启的延迟时间
));
维表join
所谓的维表Join: 进入Flink的数据,需要关联另外一些存储设备的数据,才能计算出来结果
那么存储在外部设备上的表称之为维表,可能存储在mysql也可能存储在hbase 等。
维表一般的特点是变化比较慢。——名词表,维度表。
解决方式
解决维表join的方式
方式一:
可以用一个静态代码块,或者在open方法中对一个集合初始化,用于存放想要相关联的数据。
缺点:数据不能动态改变了
方式二:
在open中初始化连接,在map中每拿到流中的一条数据,就去mysql中查找一次
缺点:数据可以动态改变,但是去mysql查找的次数太多了
方式三:
创建一个缓存区,用于存放数据,若过期则再去mysql中查询数据。
没有缺点,可以动态获取数据了,也减少了mysql的查询次数(缓冲)
唯一的是,若是多线程,可能会去mysql查询多次
方式一
package com.bigdata.day06;
import com.mchange.v2.c3p0.ComboPooledDataSource;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.MapHandler;
import org.apache.commons.dbutils.handlers.MapListHandler;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* 直接从mysql中拿出
* 弊端 只能拿到一次 不能实现动态
*/
public class _03_维表join_01 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "bigdata01:9092");
properties.setProperty("group.id", "g1");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("edu",new SimpleStringSchema(),properties);
DataStreamSource<String> source = env.addSource(consumer);
source.map(new RichMapFunction<String, String>() {
ComboPooledDataSource pool = null;
QueryRunner queryRunner = null;
List<Map<String, Object>> list = null;
@Override
public void open(Configuration parameters) throws Exception {
// 在open中执行sql
pool = new ComboPooledDataSource();
queryRunner = new QueryRunner(pool);
String sql = "select * from city ";
list = queryRunner.query(sql, new MapListHandler());
}
@Override
public void close() throws Exception {
pool.close();
}
@Override
public String map(String line) throws Exception {
String[] split = line.split(",");
Object cityName = "未知";
for (Map<String, Object> map : list) {
String cityId = (String)map.get("city_id");
if (cityId.equals(split[1])){
cityName = map.get("city_name");
}
}
return line+","+cityName;
}
}).print();
env.execute();
}
}
方式二
package com.bigdata.day06;
import com.mchange.v2.c3p0.ComboPooledDataSource;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.MapHandler;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Map;
import java.util.Properties;
/**
* 每次从kafka中拿到一条数据就从mysql中查一遍
* 弊端 对mysql的压力加大
*/
public class _03_维表join_02 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "bigdata01:9092");
properties.setProperty("group.id", "g1");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("edu",new SimpleStringSchema(),properties);
DataStreamSource<String> source = env.addSource(consumer);
source.map(new RichMapFunction<String, String>() {
ComboPooledDataSource pool = null;
QueryRunner queryRunner = null;
@Override
public void open(Configuration parameters) throws Exception {
pool = new ComboPooledDataSource();
queryRunner = new QueryRunner(pool);
}
@Override
public void close() throws Exception {
pool.close();
}
@Override
public String map(String line) throws Exception {
// 在处理逻辑中执行sql
String[] split = line.split(",");
String sql = "select city_name from city where city_id = ?";
Map<String, Object> rs = queryRunner.query(sql, new MapHandler(), split[1]);
String cityName="未知";
if (rs !=null){
cityName = (String) rs.get("city_name");
}
return line+","+cityName;
}
}).print();
env.execute();
}
}
方式三
package com.bigdata.day06;
import com.mchange.v2.c3p0.ComboPooledDataSource;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.MapHandler;
import org.apache.commons.dbutils.handlers.MapListHandler;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.cache.*;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
/**
* 最终 非常好的方式
* 现在内存中查 查不到在去mysql中找
* 唯一的问题是,假如是多线程情况下,可能会触发多次去mysql中查找的方法
*/
public class _03_维表join_03_cache {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "bigdata01:9092");
properties.setProperty("group.id", "g1");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("edu",new SimpleStringSchema(),properties);
DataStreamSource<String> source = env.addSource(consumer);
// 记得设置并行度
env.setParallelism(1);
source.map(new RichMapFunction<String, String>() {
ComboPooledDataSource pool = null;
QueryRunner queryRunner = null;
// 定义一个Cache
// 第一个是传入的参数类型 第二个是存放的值的类型
// 也就是,传入一个参数,根据这个值获取结果,拿的时候通过传入的值 拿存放的值
LoadingCache<String, String> cache;
@Override
public void open(Configuration parameters) throws Exception {
pool = new ComboPooledDataSource();
queryRunner = new QueryRunner(pool);
cache = CacheBuilder.newBuilder()
//最多缓存个数,超过了就根据最近最少使用算法来移除缓存 LRU
.maximumSize(1000)
//在更新后的指定时间后就回收
// 不会自动调用,而是当过期后,又用到了过期的key值数据才会触发的。
.expireAfterWrite(50, TimeUnit.SECONDS)
//指定移除通知
.removalListener(new RemovalListener<String, String>() {
@Override
public void onRemoval(RemovalNotification<String, String> removalNotification) {
System.out.println(removalNotification.getKey() + "被移除了,值为:" + removalNotification.getValue());
}
})
.build(//指定加载缓存的逻辑
new CacheLoader<String, String>() {
// 假如缓存中没有数据,会触发该方法的执行,并将结果自动保存到缓存中
@Override
public String load(String cityId) throws Exception {
String sql = "select city_name from city where city_id = ? ";
Map<String, Object> rs = queryRunner.query(sql, new MapHandler(), cityId);
String cityName = null;
if (rs!=null){
cityName = (String) rs.get("city_name");
}
System.out.println("进入数据库查询成功,查询的值为"+cityId+"--"+cityName);
return cityName;
}
});
}
@Override
public void close() throws Exception {
pool.close();
}
@Override
public String map(String line) throws Exception {
String[] arr = line.split(",");
// 使用这种方式取值
String cityName = cache.get(arr[1]);
return line+","+cityName;
}
}).print();
env.execute();
}
}