当前位置: 首页 > article >正文

【flink】之如何消费kafka数据并读写入redis?

背景: 

最近公司出现做了一个新需求,需求内容是加工一个营销时机,但是加工营销时机的同时需要把数据内容里的一个idmapping存入redis用于后续的读写。

准备: 

    <!-- 依赖 -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka</artifactId>
      <version>3.2.0-1.19</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-base</artifactId>
      <version>1.19.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-redis_2.11</artifactId>
      <version>1.1.5</version>
    </dependency>
  </dependencies>

代码:

package com.iterge.flink;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

/**
 * Hello world!
 *
 */
@Slf4j
public class FlinkDemo {
    //创建连接池
    static final JedisPool pool = new JedisPool("127.0.0.0",8423);
    //创建redis客户端
    static final Jedis jedis = pool.getResource();

    public static void main( String[] args ) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //DataStreamSource<String> stringDataStreamSource = env.fromData(Arrays.asList("1", "2", "3"));
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("localhost:9092")
                .setTopics("it.erge.test.topic")
                .setGroupId("it.erge.test.topic.1")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        DataStreamSource<String> stringDataStreamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        stringDataStreamSource.map(new RichMapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                //读redis
                System.out.println("test="+jedis.get("test"));
                //写redis
                jedis.setex("test",60,s);
                return s;
            }
            @Override
            public void close() throws Exception {
                super.close();
                jedis.close();
            }
        });
        stringDataStreamSource.print();
        env.execute("test");
    }
}


http://www.kler.cn/news/331773.html

相关文章:

  • chatgpt用于数据分析的弊端
  • c#代码介绍23种设计模式_16迭代器模式
  • 使用 npkill 快速清理本地 node_modules 文件
  • android 全面屏最底部栏沉浸式
  • LLM | llama.cpp 安装使用(支持CPU、Metal及CUDA的单卡/多卡推理)
  • Unity实战案例全解析:RTS游戏的框选和阵型功能(5)阵型功能 优化
  • Spring框架:Spring Core、Spring AOP、Spring MVC、Spring Boot、Spring Cloud等组件的基本原理及使用
  • 指针学习笔记
  • 安卓使用memtester进行内存压力测试
  • Spring Boot框架:新闻推荐系统开发新趋势
  • 检索器--
  • Keepalived+MySQL 高可用集群
  • 【QT Quick】基础语法:基础类与控件
  • 汽车追尾为什么是后车的责任?
  • ip 地址查看cmd命令
  • Mysql(索引与事务)
  • 用ChatGPT做数据分析与挖掘,爽!
  • 使用transformers调用owlv2实现开放目标检测
  • 数据结构:并查集
  • Axure大屏可视化模板在不同领域中的实际应用案例