Flume其二,自定义拦截器、选择器、自动容灾、负载均衡
目录
一、自定义拦截器(重要)
二、选择器
1、演示复制选择器
2、多路选择器
三、自动容灾(故障转移)
四、负载均衡
一、自定义拦截器(重要)
处理数据样例:
log='{
"host":"www.baidu.com",
"user_id":"13755569427",
"items":[
{
"item_type":"eat",
"
active_time":156234
},
{
"item_type":"car",
"active_time":156233
}
]
}'
结果样例:
[{"active_time":156234,"user_id":"13755569427","item_type":"eat","host":"www.baidu.com"},
{"active_time":156233,"user_id":"13755569427","item_type":"car","host":"www.baidu.com"}]
1、创建一个maven项目,导入jar包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.bigdata</groupId>
<artifactId>MyInterceptor</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.48</version>
</dependency>
</dependencies>
<!--可以使用maven中的某些打包插件,不仅可以帮助我们打包代码还可以打包所依赖的jar包-->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<configuration>
<!-- 禁止生成 dependency-reduced-pom.xml-->
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<relocations>
<relocation>
<!-- 解决包冲突 进行转换-->
<pattern>com.google.protobuf</pattern>
<shadedPattern>shaded.com.google.protobuf</shadedPattern>
</relocation>
</relocations>
<artifactSet>
<excludes>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
</excludes>
</filter>
</filters>
<transformers>
<!-- 某些jar包含具有相同文件名的其他资源(例如属性文件)。 为避免覆盖,您可以选择通过将它们的内容附加到一个文件中来合并它们-->
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>mainclass</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
打包插件:
builder --> plugins --> plugin
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<configuration>
<!-- 禁止生成 dependency-reduced-pom.xml-->
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<relocations>
<relocation>
<!-- 解决包冲突 进行转换-->
<pattern>com.google.protobuf</pattern>
<shadedPattern>shaded.com.google.protobuf</shadedPattern>
</relocation>
</relocations>
<artifactSet>
<excludes>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
</excludes>
</filter>
</filters>
<transformers>
<!-- 某些jar包含具有相同文件名的其他资源(例如属性文件)。 为避免覆盖,您可以选择通过将它们的内容附加到一个文件中来合并它们-->
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>mainclass</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
看一个需求:
处理数据样例:
log='{
"host":"www.baidu.com",
"user_id":"13755569427",
"items":[
{
"item_type":"eat",
"active_time":156234
},
{
"item_type":"car",
"active_time":156233
}
]
}'
结果样例:
[{"active_time":156234,"user_id":"13755569427","item_type":"eat","host":"www.baidu.com"},
{"active_time":156233,"user_id":"13755569427","item_type":"car","host":"www.baidu.com"}]
代码:
先测试一下:
json --> java 代码解析 json --> 实体 ,实体-->json 字符串 都需要使用工具
jackson、fastjson(阿里巴巴)
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class TestJson {
public static void main(String[] args) {
String log="{\n" +
" 'host':'www.baidu.com',\n" +
" 'user_id':'13755569427',\n" +
" 'items':[\n" +
" {\n" +
" 'item_type':'eat',\n" +
" 'active_time':156234\n" +
" },\n" +
" {\n" +
" 'item_type':'car',\n" +
" 'active_time':156233\n" +
" }\n" +
" ]\n" +
"}";
JSONObject jsonObject = JSON.parseObject(log);
String host = jsonObject.getString("host");
String user_id = jsonObject.getString("user_id");
System.out.println(host);
System.out.println(user_id);
JSONArray items = jsonObject.getJSONArray("items");
List list =new ArrayList<Map<String,String>>();
for (Object item : items) {
// {"active_time":156234,"item_type":"eat"}
Map map = new HashMap<String,String>();
String itemStr = item.toString();
JSONObject jsonItem = JSON.parseObject(itemStr);
String active_time = jsonItem.getString("active_time");
String item_type = jsonItem.getString("item_type");
System.out.println(active_time);
System.out.println(item_type);
map.put("active_time",active_time);
map.put("user_id",user_id);
map.put("item_type",item_type);
map.put("host",host);
list.add(map);
}
/**
* 需要转化为:
* * * [{"active_time":156234,"user_id":"13755569427","item_type":"eat","host":"www.baidu.com"},
* * * {"active_time":156233,"user_id":"13755569427","item_type":"car","host":"www.baidu.com"}]
*/
String jsonString = JSON.toJSONString(list);
System.out.println(jsonString);
}
}
package com.bigdata;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
public class DemoInterceptor implements Interceptor {
@Override
public void initialize() {
}
// 只需要关注 这个方法的写法
/**
* 需求:
* log='{
* "host":"www.baidu.com",
* "user_id":"13755569427",
* "items":[
* {
* "item_type":"eat",
* "active_time":156234
* },
* {
* "item_type":"car",
* "active_time":156233
* }
* ]
* }'
*
* 需要转化为:
* [{"active_time":156234,"user_id":"13755569427","item_type":"eat","host":"www.baidu.com"},
* {"active_time":156233,"user_id":"13755569427","item_type":"car","host":"www.baidu.com"}]
*/
@Override
public Event intercept(Event event) {
// 解析 文本数据变为另一种格式
byte[] body = event.getBody();
String content = new String(body);
/**
* {
* "host":"www.baidu.com",
* "user_id":"13755569427",
* "items":[
* {
* "item_type":"eat",
* "active_time":156234
* },
* {
* "item_type":"car",
* "active_time":156233
* }
* ]
* }
*/
// 将一个json字符串变为 json 对象
JSONObject jsonObject = JSON.parseObject(content);
// 通过对象 获取 json 中的值
String host = jsonObject.getString("host");
String user_id = jsonObject.getString("user_id");
// 通过对象获取json 数组
JSONArray items = jsonObject.getJSONArray("items");
// 定义一个集合,集合中是map
ArrayList<HashMap<String, String>> list = new ArrayList<>();
for (Object object: items) {
String obj = object.toString();
JSONObject jobj = JSON.parseObject(obj);
String item_type = jobj.getString("item_type");
String active_time = jobj.getString("active_time");
HashMap<String, String> map = new HashMap<>();
map.put("active_time",active_time);
map.put("item_type",item_type);
map.put("host",host);
map.put("user_id",user_id);
list.add(map);
}
// 将对象变为字符串
String s = JSON.toJSONString(list);
event.setBody(s.getBytes());
return event;
}
// 这个方法可以调取 上面这个方法
@Override
public List<Event> intercept(List<Event> list) {
for (int i=0;i<list.size();i++) {
Event oldEvent = list.get(i);
Event newEvent = intercept(oldEvent);
list.set(i,newEvent);
}
return list;
}
@Override
public void close() {
}
// 作用只有一个,就是new 一个自定义拦截器的类
public static class BuilderEvent implements Builder{
@Override
public Interceptor build() {
return new DemoInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
打包,上传至 flume 下的lib 下。
测试:
编写一个flume脚本文件 testInter.conf
a1.sources = s1
a1.channels = c1
a1.sinks = r1
a1.sources.s1.type = TAILDIR
#以空格分隔的文件组列表。每个文件组表示要跟踪的一组文件
a1.sources.s1.filegroups = f1
#文件组的绝对路径
a1.sources.s1.filegroups.f1=/home/b.log
#使用自定义拦截器
a1.sources.s1.interceptors = i1
a1.sources.s1.interceptors.i1.type = com.bigdata.DemoInterceptor$BuilderEvent
a1.channels.c1.type = file
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.r1.type = hdfs
a1.sinks.r1.hdfs.path = /flume/202409
a1.sinks.r1.hdfs.fileSuffix= .log
# 将上传的数据格式使用text类型,便于查看
a1.sinks.r1.hdfs.fileType=DataStream
a1.sinks.r1.hdfs.writeFormat=Text
a1.sources.s1.channels = c1
a1.sinks.r1.channel = c1
运行该脚本:
flume-ng agent -c ./ -f testInterceptor.conf -n a1 -Dflume.root.logger=INFO,console
此时,说明我们打包的时候没有将这个jar包打包到自定义的jar包,可以通过手动的提交的方式解决这个问题。
将fast-json.jar 放入到 flume/lib
假如你使用了打包插件,已经将这个 fast-json 打入了你的 jar 包中,无需该操作。
{"host":"www.baidu.com","user_id":"13755569427","items":[{"item_type":"eat","active_time":156234},{"item_type":"car","active_time":156233}]}
接着开始进行测试,必须先启动flume
编写一个脚本,模拟 b.log 中不断的产生json数据的场景。
#!/bin/bash
log='{
"host":"www.baidu.com",
"user_id":"13755569427",
"items":[
{
"item_type":"eat",
"active_time":156234
},
{
"item_type":"car",
"active_time":156233
}
]
}'
echo $log >> /home/b.log
保存,并且赋予权限:
chmod 777 createJson.sh
执行这个脚本,就可以模拟不断的向 b.log中传输数据了
./createJson.sh
如何自定义时间戳拦截器(只需要在方法中编写如下代码即可)
HashMap<String, String> hashMap = new HashMap<>();
hashMap.put("timestamp",System.currentTimeMillis()+"");
event.setHeaders(hashMap);
二、选择器
一个Source对应多个channel的情况下,多个Channel中的数据是否相同,取决于我们使用了什么选择器,默认是复制选择器。也可以手动的使用多路选择器。
1、演示复制选择器
编写flume脚本,需要一个source,两个channel,以及两个sink
a1.sources = r1
a1.channels = c1 c2
a1.sinks = s1 s2
# avro http syslogtcp
# avro avro-client
# http curl
# syslogtcp nc
a1.sources.r1.type = syslogtcp
a1.sources.r1.host = bigdata01
a1.sources.r1.port = 7777
#执行选择器类型为复制选择器
a1.sources.r1.selector.type=replicating
a1.channels.c1.type=memory
a1.channels.c2.type=memory
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=/flume/%Y-%m-%d/rep
a1.sinks.s1.hdfs.filePrefix=s1sink
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=/flume/%Y-%m-%d/rep
a1.sinks.s2.hdfs.filePrefix=s2sink
a1.sinks.s2.hdfs.fileSuffix=.log
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s2.hdfs.useLocalTimeStamp=true
a1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2
启动这个flume脚本:
flume-ng agent -c ./ -f syslogtcp-memory-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
向bigdata01 中的 7777 端口发送消息:
echo "hello world" | nc bigdata01 7777
如果nc命令无法识别,需要安装一下 yum install -y nc
查看里面的数据发现都一样,说明使用的是复制选择器。
2、多路选择器
就是每次发送消息的时候,可以指定发送消息走哪条channel,只有这条channel对应的sink才有数据,其他sink没数据。
举例:
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state #以每个Event的header中的state这个属性的值作为选择channel的依据
a1.sources.r1.selector.mapping.CZ = c1 #如果state=CZ,则选择c1这个channel
a1.sources.r1.selector.mapping.US = c2 c3 #如果state=US,则选择c2 和 c3 这两个channel
a1.sources.r1.selector.default = c4 #默认使用c4这个channel
说明一个小区别:
avro
syslogtcp
http
可以指定一个hostname和端口号
不同的source,我们使用的发送数据的方式是不一样的:
avro-client
nc
curl
curl 是可以模拟发送get 或者 post 请求的。
比如: curl www.baidu.com
编写脚本:
a1.sources = r1
a1.channels = c1 c2
a1.sinks = s1 s2
a1.sources.r1.type= http
a1.sources.r1.bind = bigdata01
a1.sources.r1.port = 8888
a1.sources.r1.selector.type=multiplexing
# header 跟 mapping 结合在一起,用于发送消息时,指定发送的方向
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.USER = c1
a1.sources.r1.selector.mapping.ORDER = c2
# 发送的消息找不到具体的channel,就走默认的c1
a1.sources.r1.selector.default = c1
a1.channels.c1.type=memory
a1.channels.c2.type=memory
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=/flume/%Y-%m-%d/mul
a1.sinks.s1.hdfs.filePrefix=s1sink
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=/flume/%Y-%m-%d/mul
a1.sinks.s2.hdfs.filePrefix=s2sink
a1.sinks.s2.hdfs.fileSuffix=.log
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s2.hdfs.useLocalTimeStamp=true
a1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2
启动该脚本:
flume-ng agent -c ./ -f mul.conf -n a1 -Dflume.root.logger=INFO,console
模拟http请求:
curl -X POST -d '[{"headers":{"state":"USER"},"body":"this my multiplex to c1"}]' http://bigdata01:8888
curl -X POST -d '[{"headers":{"state":"ORDER"},"body":"this my multiplex to c2"}]' http://bigdata01:8888
效果就是,当我发送一条指令的时候,走state=USER的路径,只生成一个文件,走另一条路才会生成另一个不同的文件。
三、自动容灾(故障转移)
多个sink组成一个组,这个组内的sink只有一台工作,假如这一台坏了,另一台自动的工作。
为了演示这个效果,我使用了三个Agent.模型如下:
在bigdata02和bigdata03上安装flume
在集群中可以使用脚本
xsync.sh /opt/installs/flume/
xsync.sh /etc/profile
xcall.sh source /etc/profile
也可以使用长拷贝命令,例如:
scp -r /opt/installs/flume1.9.0/ root@hadoop11:/opt/installs/
# 因为 /etc/hosts 文件中没有配置映射,所以使用ip代替了
scp -r /opt/installs/flume1.9.0/ root@192.168.52.12:/opt/installs/
scp -r /etc/profile root@hadoop11:/etc
scp -r /etc/profile root@192.168.52.12:/etc
两个虚拟机需要刷新配置文件
source /etc/profile
在bigdata01上,编写flume脚本:
failover.conf
#list names
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
# source
a1.sources.r1.type = syslogtcp
a1.sources.r1.host = bigdata01
a1.sources.r1.port = 10086
# channel
a1.channels.c1.type = memory
# sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata02
a1.sinks.k1.port = 10087
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = bigdata03
a1.sinks.k2.port = 10088
#设置sink组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
# 此处是设置的权重,权重越多,就工作
a1.sinkgroups.g1.processor.priority.k1 = 10
a1.sinkgroups.g1.processor.priority.k2 = 5
a1.sinkgroups.g1.processor.maxpenalty = 10000
flume-ng agent -c ../conf -f ./failover.conf -n a1 -Dflume.root.logger=INFO,console
修改第二台电脑的脚本:
bigdata02
failover2.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# source
a1.sources.r1.type = avro
a1.sources.r1.bind = bigdata02
a1.sources.r1.port = 10087
# channel
a1.channels.c1.type = memory
# sink
a1.sinks.k1.type = logger
启动flume脚本:
flume-ng agent -f ./failover2.conf -n a1 -Dflume.root.logger=INFO,console
在bigdata03上,编写failover3.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# source
a1.sources.r1.type = avro
a1.sources.r1.bind = bigdata03
a1.sources.r1.port = 10088
# channel
a1.channels.c1.type = memory
# sink
a1.sinks.k1.type = logger
flume-ng agent -f ./failover3.conf -n a1 -Dflume.root.logger=INFO,console
bigdata02和03启动无异常,再启动01上的脚本:
flume-ng agent -f ./failover.conf -n a1 -Dflume.root.logger=INFO,console
在hadoop10上,发送消息:
echo "wei,wei,wei" | nc hadoop10 10086
发现hadoop11有反应,出现了消息,而hadoop12无反应。因为hadoop11权重大,需要工作
测试故障转移,将hadoop11停掉,再在hadoop10上发消息,就发现hadoop12收到消息了,故障转移了。
假如hadoop11又好了,怎么办? 发现hadoop11继续工作,劳苦的命。
四、负载均衡
发送一个Event的时候到底是哪个sink来处理这个工作,有两种方式:轮询 和 随机
演示一下:
hadoop10中创建balance.conf
#list names
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
# source
a1.sources.r1.type = syslogtcp
a1.sources.r1.host = bigdata01
a1.sources.r1.port = 10086
# channel
a1.channels.c1.type = memory
# sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata02
a1.sinks.k1.port = 10087
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = bigdata03
a1.sinks.k2.port = 10088
#设置sink组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.selector = random
hadoop11,hadoop12 不变,服务必须是启动的。
使用我们的nc命令发送请求,发现hadoop11和12随机的处理请求的数据。
- flume轮训是每隔
一段时间
轮训,而不是每秒轮训一次。所以可能多条在同一时间间隔的events都被一个输出到一个sink端。