当前位置: 首页 > article >正文

flink学习(13)—— 重试机制和维表join

重试机制

当任务出现异常的时候,会直接停止任务——解决方式,重试机制

1、设置checkpoint后,会给任务一个重启策略——无限重启

2、可以手动设置任务的重启策略

代码设置

//开启checkpoint后,默认是无限重启,可以设置该值 表示不重启
env.setRestartStrategy(RestartStrategies.noRestart());


//作业失败flink中最多重启3次,每次重启的最小间隔是10s
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));

//2分钟内最多重启3次,每次重启的最小间隔是5秒
env.setRestartStrategy(
    RestartStrategies.failureRateRestart(3,
                                         Time.of(2,TimeUnit.MINUTES),
                                         Time.of(5,TimeUnit.SECONDS))
);

//无限重启
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
    Integer.MAX_VALUE,  // 无限重启次数
    Time.of(10, TimeUnit.SECONDS)  // 每次重启的延迟时间
));

维表join

所谓的维表Join: 进入Flink的数据,需要关联另外一些存储设备的数据,才能计算出来结果

那么存储在外部设备上的表称之为维表,可能存储在mysql也可能存储在hbase 等。

维表一般的特点是变化比较慢。——名词表,维度表。

解决方式

 解决维表join的方式
        方式一:
            可以用一个静态代码块,或者在open方法中对一个集合初始化,用于存放想要相关联的数据。
            缺点:数据不能动态改变了
        方式二:
            在open中初始化连接,在map中每拿到流中的一条数据,就去mysql中查找一次
            缺点:数据可以动态改变,但是去mysql查找的次数太多了
        方式三:
            创建一个缓存区,用于存放数据,若过期则再去mysql中查询数据。
            没有缺点,可以动态获取数据了,也减少了mysql的查询次数(缓冲)
            唯一的是,若是多线程,可能会去mysql查询多次

方式一

package com.bigdata.day06;

import com.mchange.v2.c3p0.ComboPooledDataSource;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.MapHandler;
import org.apache.commons.dbutils.handlers.MapListHandler;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * 直接从mysql中拿出
 * 弊端 只能拿到一次 不能实现动态
 */
public class _03_维表join_01 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "bigdata01:9092");
        properties.setProperty("group.id", "g1");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("edu",new SimpleStringSchema(),properties);
        DataStreamSource<String> source = env.addSource(consumer);

        source.map(new RichMapFunction<String, String>() {
            ComboPooledDataSource pool = null;
            QueryRunner queryRunner = null;
            List<Map<String, Object>> list = null;
            @Override
            public void open(Configuration parameters) throws Exception {
            // 在open中执行sql
                pool = new ComboPooledDataSource();
                queryRunner = new QueryRunner(pool);
                String sql = "select * from city ";
                list = queryRunner.query(sql, new MapListHandler());

            }

            @Override
            public void close() throws Exception {
                pool.close();
            }

            @Override
            public String map(String line) throws Exception {
                String[] split = line.split(",");
                Object cityName = "未知";
                for (Map<String, Object> map : list) {
                    String cityId = (String)map.get("city_id");
                    if (cityId.equals(split[1])){
                        cityName = map.get("city_name");
                    }
                }

                return line+","+cityName;
            }
        }).print();

        env.execute();
    }
}

方式二

package com.bigdata.day06;

import com.mchange.v2.c3p0.ComboPooledDataSource;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.MapHandler;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Map;
import java.util.Properties;

/**
 * 每次从kafka中拿到一条数据就从mysql中查一遍
 * 弊端 对mysql的压力加大
 */
public class _03_维表join_02 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "bigdata01:9092");
        properties.setProperty("group.id", "g1");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("edu",new SimpleStringSchema(),properties);
        DataStreamSource<String> source = env.addSource(consumer);

        source.map(new RichMapFunction<String, String>() {
            ComboPooledDataSource pool = null;
            QueryRunner queryRunner = null;
            @Override
            public void open(Configuration parameters) throws Exception {
                pool = new ComboPooledDataSource();
                queryRunner = new QueryRunner(pool);
            }

            @Override
            public void close() throws Exception {
                pool.close();
            }

            @Override
            public String map(String line) throws Exception {
            // 在处理逻辑中执行sql
                String[] split = line.split(",");
                String sql = "select city_name from city where city_id = ?";
                Map<String, Object> rs = queryRunner.query(sql, new MapHandler(), split[1]);
                String cityName="未知";
                if (rs !=null){
                     cityName = (String) rs.get("city_name");
                }

                return line+","+cityName;
            }
        }).print();

        env.execute();
    }
}

方式三

package com.bigdata.day06;

import com.mchange.v2.c3p0.ComboPooledDataSource;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.MapHandler;
import org.apache.commons.dbutils.handlers.MapListHandler;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.cache.*;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

/**
 * 最终 非常好的方式
 * 现在内存中查 查不到在去mysql中找
 * 唯一的问题是,假如是多线程情况下,可能会触发多次去mysql中查找的方法
 */
public class _03_维表join_03_cache {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "bigdata01:9092");
        properties.setProperty("group.id", "g1");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("edu",new SimpleStringSchema(),properties);
        DataStreamSource<String> source = env.addSource(consumer);
        // 记得设置并行度
        env.setParallelism(1);

        source.map(new RichMapFunction<String, String>() {
            ComboPooledDataSource pool = null;
            QueryRunner queryRunner = null;

            // 定义一个Cache
            // 第一个是传入的参数类型 第二个是存放的值的类型
            // 也就是,传入一个参数,根据这个值获取结果,拿的时候通过传入的值 拿存放的值
            LoadingCache<String, String> cache;
            @Override
            public void open(Configuration parameters) throws Exception {
                pool = new ComboPooledDataSource();
                queryRunner = new QueryRunner(pool);

                cache = CacheBuilder.newBuilder()
                        //最多缓存个数,超过了就根据最近最少使用算法来移除缓存 LRU
                        .maximumSize(1000)
                        //在更新后的指定时间后就回收
                        // 不会自动调用,而是当过期后,又用到了过期的key值数据才会触发的。
                        .expireAfterWrite(50, TimeUnit.SECONDS)
                        //指定移除通知
                        .removalListener(new RemovalListener<String, String>() {
                            @Override
                            public void onRemoval(RemovalNotification<String, String> removalNotification) {
                                System.out.println(removalNotification.getKey() + "被移除了,值为:" + removalNotification.getValue());
                            }
                        })
                        .build(//指定加载缓存的逻辑
                                new CacheLoader<String, String>() {
                                    // 假如缓存中没有数据,会触发该方法的执行,并将结果自动保存到缓存中
                                    @Override
                                    public String load(String cityId) throws Exception {

                                        String sql = "select city_name from city where city_id = ? ";
                                        Map<String, Object> rs = queryRunner.query(sql, new MapHandler(), cityId);
                                        String cityName = null;
                                        if (rs!=null){
                                            cityName = (String) rs.get("city_name");
                                        }
                                        System.out.println("进入数据库查询成功,查询的值为"+cityId+"--"+cityName);
                                        return cityName;
                                    }
                                });

            }

            @Override
            public void close() throws Exception {
                pool.close();
            }

            @Override
            public String map(String line) throws Exception {
                String[] arr = line.split(",");
                // 使用这种方式取值
                String cityName = cache.get(arr[1]);
                return line+","+cityName;
            }
        }).print();

        env.execute();
    }
}


http://www.kler.cn/a/420089.html

相关文章:

  • 2024年12月3日Github流行趋势
  • 深入探讨锁升级问题
  • 用Python做数据分析环境搭建及工具使用(Jupyter)
  • 机器学习:精确率与召回率的权衡
  • 第七课 Unity编辑器创建的资源优化_UI篇(UGUI)
  • 微服务搭建----springboot接入Nacos2.x
  • 在 uniapp 项目中使用 Iconify 字体图标库
  • 《Python PDF 格式转换全攻略》
  • Linux 进程管理详解
  • 张量并行和流水线并行在Transformer中的具体部位
  • 25.4K Star 高效内存数据存储!特别好用的Redis 和 Memcached 替代品:Dragonfly!
  • redisson-spring-data与Spring-Data-Redis的版本关系问题
  • 性能监控系统Prometheus、Node-exporter与Grafana部署详解搭建
  • 黑马程序员Java项目实战《苍穹外卖》Day03
  • Xilinx PCIe高速接口入门实战(一)
  • 软件保护:从用户角度出发的安全需求与体验
  • C++之 String 类的模拟实现
  • k8s api对象,CRD
  • linux 操作系统环境配置 redhat9
  • 如何利用微型5G网关为智慧无人矿车提供精确定位
  • pytest(一)csv数据驱动
  • AI开发 - GPT之魂 用Python 演示chatGPT的自注意力机制 - 机器学习
  • JavaScript根据数据生成柱形图
  • 大数据Hadoop实战:从基础到应用
  • STM407IGT6+WS2818灯带
  • ubuntu部署RocketMQ