当前位置: 首页 > article >正文

kafka:使用flume自定义拦截器,将json文件抽取到kafka的消息队列(topic)中,再从topic中将数据抽取到hdfs上

抽取trans_info.json的数据到kafka上,对其中的tr_flag=0的数据进行过滤抛弃,只保留正常的状态数据: 将此json文件放在集群中的 /home/zidingyi/trans_info.json 目录下

首先先在java代码中自定义拦截器:

1):创建maven项目,在pom文件中导入相关依赖

<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.48</version>
    </dependency>
</dependencies>

<!--可以使用maven中的某些打包插件,不仅可以帮助我们打包代码还可以打包所依赖的jar包-->

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.1.1</version>
            <configuration>
                <!-- 禁止生成 dependency-reduced-pom.xml-->
                <createDependencyReducedPom>false</createDependencyReducedPom>
            </configuration>
            <executions>
                <!-- Run shade goal on package phase -->
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <relocations>
                            <relocation>
                                <!-- 解决包冲突 进行转换-->
                                <pattern>com.google.protobuf</pattern>
                                <shadedPattern>shaded.com.google.protobuf</shadedPattern>
                            </relocation>
                        </relocations>
                        <artifactSet>
                            <excludes>
                                <exclude>log4j:*</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <!-- Do not copy the signatures in the META-INF folder.
                                Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <!-- 某些jar包含具有相同文件名的其他资源(例如属性文件)。 为避免覆盖,您可以选择通过将它们的内容附加到一个文件中来合并它们-->
                            <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                <resource>reference.conf</resource>
                            </transformer>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>mainclass</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

自定义拦截器代码:

package com.bigdata;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;

public class zidingyi implements Interceptor {


    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        String body = new String(event.getBody());

        JSONObject jsonObject = JSONObject.parseObject(body);

        //获取json中的值
        int trFlag = jsonObject.getInteger("tr_flag");


        // 如果tr_flag中的值为0,就返回空
        if (trFlag == 0){
            return null;
        }

        return event;

    }

    @Override
    public List<Event> intercept(List<Event> list) {

        ArrayList<Event> filterEvents = new ArrayList<>();
        for (Event event : list) {

            Event intercept = intercept(event);

            if (intercept != null){

                filterEvents.add(intercept);

            }
        }
        return filterEvents;

    }

    @Override
    public void close() {

    }
    public static class BuilderEvent implements Builder{

        @Override
        public Interceptor build() {
            return new zidingyi();
        }

        @Override
        public void configure(Context context) {

        }

    }

}

使用maven打包,生成jar包后上传到flume下的lib目录下

2):上传好jar包后,在flume下的conf中创建了一个myconf文件,创建一个zidinfyi.conf文件,编写flume的conf文件即可(记得使用自定义拦截器)

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/zidingyi/trans_info.json

#使用自定义拦截器
a1.sources.s1.interceptors = i1
# type指的是编写java代码所在目录的路径名(我的是在com.bigdata.zidingyi下)
a1.sources.s1.interceptors.i1.type = com.bigdata.zidingyi$BuilderEvent

# 修改sink为kafka
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = bigdata01:9092
a1.sinks.k1.kafka.topic = zidingyi
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

执行之前,先在kafka中创建消息队列(topic)中创建一个topic :zidingyi 数据将会导入到这个topic中

创建好后执行conf文件即可

flume-ng agent -c ./ -f zidingyi.conf -n a1 -Dflume.root.logger=INFO,console

可以使用

kafka-console-consumer.sh --bootstrap-server bigdata01:9092 --from-beginning --topic zidingyi

把主题中所有的数据都读取出来(包括历史数据)并且还可以接收来自生产者的新数据

3):将topic中的数据抽取到hdfs中

里面的group.id随便指定即可

执行此conf文件即可

flume-ng agent -c ./ -f zidingyi2.conf -n a1 -Dflume.root.logger=INFO,console
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 100
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
a1.sources.r1.kafka.topics = zidingyi
a1.sources.r1.kafka.consumer.group.id = donghu

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /zidingyi/ods/clearDate/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix = events
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp=true

a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 102400
a1.sinks.k1.hdfs.rollInterval = 0

a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text

数据抽取成功


http://www.kler.cn/a/394806.html

相关文章:

  • CNN和Transfomer介绍
  • 浅谈算法交易
  • SQL server学习09-数据库编程(上)
  • Postman接口测试工具使用详解
  • springboot472基于web网上村委会业务办理系统(论文+源码)_kaic
  • Java - 日志体系_Apache Commons Logging(JCL)日志接口库
  • 麒麟服务器工作站SP1 arm环境qt5.6.3源码编译
  • 如何处理 iOS 客户端内 Webview H5 中后台播放的音视频问题
  • Mac 使用mac 原生工具将mp4视频文件提取其中的 mp3 音频文件
  • git config是做什么的?
  • 如何在 Ubuntu 22.04 上安装 ownCloud
  • 数字IC后端实现之Innovus specifyCellEdgeSpacing和ICC2 set_placement_spacing_rule的应用
  • 低代码可视化-uniapp开关选择组件-低码生成器
  • 理解 C++ 中的 `const` 关键字
  • AI 模型:追求全能还是专精?
  • python 数据类型----可变数据类型
  • 在 RHEL 8 | CentOS Linux release 8.5.2111上安装 Zabbix 6
  • 网上怎么样可以挣钱,分享几种可以让你在家赚钱的兼职项目
  • linux虚拟机无法使用yum在线拉取
  • 开发语言中,堆区和栈区的区别
  • 自动化生成测试用例:利用OpenAI提升电商网站测试覆盖率
  • macOS 设置固定IP
  • 一文详解MacOS使用VSCode搭建SpringBoot+Gradle开发环境
  • HarmonyOS ArkUI(基于ArkTS) 开发布局 (上)
  • F5全新报告揭示AI时代API安全面临严峻挑战
  • 笔记 | image may have poor performance,or fail,if run via emulation