当前位置: 首页 > article >正文

【flink】之集成mybatis对mysql进行读写

背景: 

在现代大数据应用中,数据的高效处理和存储是核心需求之一。Flink作为一款强大的流处理框架,能够处理大规模的实时数据流,提供丰富的数据处理功能,如窗口操作、连接操作、聚合操作等。而MyBatis则是一款优秀的持久层框架,能够简化数据库操作,提高开发效率。将这两者结合使用,可以实现高效的数据处理和存储。

介绍:

MyBatis简介

MyBatis是一款基于Java的持久层框架,它可以使用XML配置文件或注解来定义数据库操作。MyBatis提供了简单的API来执行SQL语句,以及更高级的API来处理复杂的数据库操作。其核心是SQL映射,可以将关系型数据库的表映射到Java对象中,从而实现对数据库的操作。此外,MyBatis还提供了一些高级功能,如动态SQL、缓存等,以提高开发效率和性能。

Flink简介

Flink是一款流处理框架,可以处理大规模的实时数据流。Flink支持各种数据源和数据接收器,如Kafka、HDFS、TCP等。Flink的核心是流计算模型,可以实现对数据流的有状态计算,从而实现对实时数据的处理。Flink提供了丰富的数据处理功能,如窗口操作、连接操作、聚合操作等,以满足不同的应用需求。

目的:

Flink集成MyBatis的目的

Flink集成MyBatis的主要目的是将MyBatis作为Flink的数据源,通过Flink处理实时数据流,实现高效的数据处理和存储。使用MyBatis定义数据库操作,以实现高效的数据存储和查询;使用Flink处理实时数据流,以实现高效的数据处理和分析。

准备:

添加依赖
    <!--添加spring依赖-->
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-jdbc</artifactId>
      <version>5.2.2.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-aop</artifactId>
      <version>5.2.2.RELEASE</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-aspects</artifactId>
      <version>5.2.2.RELEASE</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-context</artifactId>
      <version>5.2.2.RELEASE</version>
    </dependency>

    <!--添加mybatis相关依赖-->
    <dependency>
      <groupId>org.mybatis</groupId>
      <artifactId>mybatis</artifactId>
      <version>3.5.4</version>
    </dependency>

    <dependency>
      <groupId>org.mybatis</groupId>
      <artifactId>mybatis-spring</artifactId>
      <version>2.0.7</version>
    </dependency>

    <!--添加连接池和mysql驱动依赖-->
    <dependency>
      <groupId>com.zaxxer</groupId>
      <artifactId>HikariCP</artifactId>
      <version>3.4.5</version>
      <exclusions>
        <exclusion>
          <artifactId>slf4j-api</artifactId>
          <groupId>org.slf4j</groupId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
    </dependency>

    <!-- 加上这个才能辨认到*.yml文件 如果配置文件不使用yaml,则不需要引用此依赖-->
    <dependency>
      <groupId>com.fasterxml.jackson.dataformat</groupId>
      <artifactId>jackson-dataformat-yaml</artifactId>
      <version>2.17.2</version>
    </dependency>

 代码示例:

配置文件设置

config.properties文件配置


local.url=jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=ROUND&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull
local.username=root
local.password=
local.maximumPoolSize=10

或者配置yml文件,(二选其一)如下:

local:
  url: jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=ROUND&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull
  username: root
  password:
  maximumPoolSize: 5
配置文件加载
package com.iterge.flink.utils;


import org.springframework.beans.factory.config.YamlPropertiesFactoryBean;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PropertiesLoaderUtils;

import java.io.IOException;
import java.util.Properties;
import java.util.Set;

/**
 * @author iterge
 * @version 1.0
 * @date 2024/10/18 14:34
 * @description spring环境初始化
 */

public class SpringEnv {

    private static volatile boolean inited = false;
    //配置文件地址
    private static final String applicationLocation = "/application.yml";

    public static void init() {
        if (!inited) {
            System.out.println("...........................spring init start ...........................");
            //加载配置文件
            AnnotationConfigApplicationContext springContext = new AnnotationConfigApplicationContext();
            springContext.scan("com.iterge.flink");
            springContext.refresh();
            System.out.println("...........................spring init end ...........................");

            System.out.println("...........................config init start ...........................");
            //loadProperties();
            loadYamlProperties();
            System.out.println("...........................config init start ...........................");

            inited = true;
        }
    }

    /**
     * 加载配置文件
     */
    private static void loadProperties() {
        try {
            Resource resource = new ClassPathResource(applicationLocation);
            Properties properties = PropertiesLoaderUtils.loadProperties(resource);
            Set<String> keys = properties.stringPropertyNames();
            for (String key : keys) {
                System.setProperty(key, properties.getProperty(key));
            }
        } catch (IOException e) {
            throw new RuntimeException(e.getMessage());
        }
    }

    /**
     * 加载yml文件
     */
    private static void loadYamlProperties() {
        try {
            Resource resource = new ClassPathResource(applicationLocation);
            YamlPropertiesFactoryBean yamlPropertiesFactoryBean = new YamlPropertiesFactoryBean();
            yamlPropertiesFactoryBean.setResources(resource);
            Properties properties = yamlPropertiesFactoryBean.getObject();
            assert properties != null;
            Set<String> keys = properties.stringPropertyNames();
            for (String key : keys) {
                System.setProperty(key, properties.getProperty(key));
            }
        }catch (Exception e){
            throw new RuntimeException(e.getMessage());
        }
    }
}
数据源配置&加载
package com.iterge.flink.datasource;

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

import javax.sql.DataSource;

/**
 * @author iterge
 * @version 1.0
 * @date 2024/10/12 15:33
 * @description 本地数据源加载配置
 */

@Configuration
@Lazy
@MapperScan(basePackages = "com.iterge.flink.mapper"
        ,sqlSessionFactoryRef = "localDataSourceSqlSessionFactory"
        ,lazyInitialization = "true")
public class LocalDatasourceConfig {

    @Value("${local.url}")
    private String url;
    @Value("${local.username}")
    private String user;
    @Value("${local.password}")
    private String password;
    @Value("${local.maximumPoolSize:10}")
    private Integer maxPoolSize;


    @Bean("localDataSource")
    public DataSource localDataSource() {
        return DataSourceHelper.createDataSource(url, user, password, "localDataSource", 5, maxPoolSize);
    }

    @Bean("localDataSourceSqlSessionFactory")
    public SqlSessionFactory localDataSourceSqlSessionFactory(
            @Qualifier("localDataSource") DataSource dataSource) throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        // mapper的xml形式文件位置必须要配置,不然将报错:no statement (这种错误也可能是mapper的xml中,namespace与项目的路径不一致导致)
        bean.setMapperLocations(
                new PathMatchingResourcePatternResolver().getResources("classpath:mapper/*.xml"));
        return bean.getObject();
    }
}
package com.iterge.flink.datasource;


import com.zaxxer.hikari.HikariDataSource;

/**
 * @author iterge
 * @version 1.0
 * @date 2024/10/12 15:44
 * @description 数据源创建工具
 */
public class DataSourceHelper {

    public static HikariDataSource createDataSource(String jdbcUrl,
                                                    String user,
                                                    String password,
                                                    String poolName,
                                                    Integer minIdle,
                                                    Integer maxPoolSize) {
        HikariDataSource dataSource = new HikariDataSource();
        dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
        dataSource.setJdbcUrl(jdbcUrl);
        dataSource.setUsername(user);
        dataSource.setPassword(password);
        dataSource.setIdleTimeout(120000);
        dataSource.setMinimumIdle(minIdle);
        dataSource.setMaximumPoolSize(maxPoolSize);
        dataSource.setMaxLifetime(600000);
        dataSource.setRegisterMbeans(false);
        dataSource.setConnectionTimeout(2000);
        dataSource.setPoolName(poolName);

        return dataSource;
    }

}
创建实体类
package com.iterge.flink.entity;

import lombok.Data;

/**
 * @author iterge
 * @date 2024/10/12 16:00:50
 */

@Data
public class User {
    private Integer id;
    private String name;
}
创建mapper
package com.iterge.flink.mapper;


import com.iterge.flink.entity.User;
import org.apache.ibatis.annotations.Mapper;

/**
 * @author iterge
 * @version 1.0
 * @date 2024/10/12 15:59
 * @description 用户对象dao
 */

@Mapper
public interface UserMapper {

    int insertOne(User user);

}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.iterge.flink.mapper.UserMapper">

    <insert id="insertOne" keyProperty="id" useGeneratedKeys="true" parameterType="com.iterge.flink.entity.User">
        insert into t_user(name) values(#{name})
    </insert>

</mapper>
上下文获取工具
package com.iterge.flink.utils;


import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
 * @author iterge
 * @version 1.0
 * @date 2024/10/12 16:20
 * @description 上下文文获取工具
 */

@Slf4j
@Component
public class ContextUtil implements ApplicationContextAware {

    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext context) throws BeansException {
        ContextUtil.applicationContext = context;
    }

    public static ApplicationContext getContext() {
        return applicationContext;
    }

    public static Object getBean(String name) {
        if (getContext() == null) {
            log.error("spring context can not be found");
            return null;
        }
        if (StringUtils.isBlank(name)) {
            log.error("bean name can not be null");
            return false;
        }
        return getContext().getBean(name);
    }
}
创建flink任务
package com.iterge.flink.job;

import com.iterge.flink.entity.User;
import com.iterge.flink.mapper.UserMapper;
import com.iterge.flink.utils.ContextUtil;
import com.iterge.flink.utils.SpringEnv;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

/**
 *
 * @author FlinkMybatisDemo
 * @date 2024/10/12 11:17
 * @version 1.0
 * @description 整合mybatis
 *
*/

@Slf4j
public class FlinkMybatisDemo {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("localhost:9092")
                .setTopics("it.erge.test.topic")
                .setGroupId("it.erge.test.topic.1")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        DataStreamSource<String> stringDataStreamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        SingleOutputStreamOperator<String> process = stringDataStreamSource.process(new ProcessFunction<String, String>() {
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                SpringEnv.init();
            }

            @Override
            public void processElement(String s, ProcessFunction<String, String>.Context context, Collector<String> collector) throws Exception {
                log.info("message={}",s);
                User u = new User();
                u.setName(s);
                UserMapper mapper = ContextUtil.getContext().getBean(UserMapper.class);
                mapper.insertOne(u);
                collector.collect(s);
            }
        });
        process.print();
        env.execute("mybatis-demo");
    }
}

代码地址:

GitCode - 全球开发者的开源社区,开源代码托管平台


http://www.kler.cn/a/370935.html

相关文章:

  • VUE学习笔记4__安装开发者工具
  • Rust 强制类型转换和动态指针类型的转换
  • 读书笔记~管理修炼-风险性决策:学会缩小风险阈值
  • JS-Web API-day02
  • 【C++】B2112 石头剪子布
  • TCP 连接状态标识 | SYN, FIN, ACK, PSH, RST, URG
  • WPF的行为(Behavior)
  • 【vs2022】windows可用的依赖预编译库
  • Nginx流量同时转发多后端(流量镜像分发)
  • WPF+MVVM案例实战(十一)- 环形进度条实现
  • Flink CDC系列之:学习理解核心概念——Data Sink
  • RCNN系列算法
  • ES、BOM 和 DOM
  • java-web-day6-下-知识点小结
  • Android性能优化之2个帧率和卡顿监控方案(附实现代码)
  • 【网页内嵌PDF下载】PDF.js妙用
  • MFC tcpclient
  • STM32主从定时器输出个数、频率可调的脉冲
  • 知名数字中国战略布局与新质生产力培训师讲师培训讲师唐兴通数字经济数字化转型专家教授学者大数据AIGC大模型智能化战略数字时代商业模式创新
  • Hana 到 PostgreSQL 数据迁移同步
  • QT 从ttf文件中读取图标
  • 使用命令行自动生成markdown文档目录
  • 针对初学者的PyTorch项目推荐
  • 【论文阅读】Real-ESRGAN
  • 拥塞控制与TCP子问题(粘包问题,异常情况等)
  • OpenHarmony4.0配置应用开机自启