Flink CDC 自定义反序列化
CDC读取Mysql数据 封装成JSONObject
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
/**
* 封装的数据格式 JSON
* {
* "database":"",
* "tableName":"",
* "type":"c u d",
* "before":"{"id":"","name":"","":""...}",
* "after":"{"id":"","name":"","":""...}",
* // "ts":156489196115616
* }
*/
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//创建JSON对象用于存储最终数据
JSONObject json = new JSONObject();
//获取库名&表名
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String databaes = fields[1];
String tableName = fields[2];
Struct value = (Struct)sourceRecord.value();
//获取before 数据
Struct before = value.getStruct("before");
JSONObject beforeJson = new JSONObject();
Schema beforeSchema = before.schema();
List<Field> beforeFields = beforeSchema.fields();
if(before != null){
for (Field field : beforeFields) {
Object beforeValue = before.get(field);
beforeJson.put(field.name(), beforeValue);
}
}
//获取after 数据
Struct after = value.getStruct("after");
JSONObject afterJson = new JSONObject();
Schema afterSchema = after.schema();
List<Field> afterFields = afterSchema.fields();
if(after != null){
for (Field field : afterFields) {
Object afterValue = after.get(field);
afterJson.put(field.name(), afterValue);
}
}
//获取操作类型
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
System.out.println(operation);
//将字段写入JSON对象
json.put("database",databaes);
json.put("tableName",tableName);
json.put("before",beforeJson);
json.put("after",afterJson);
json.put("type",operation);
//输出数据
collector.collect(json.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}