Kafka、Kafka Streams、Drools、Redis 和分布式数据库的风控系统程序
由于实时风控系统难度较大,集成框架设计各个单位均有特点,快速建立一个通用性较强,学习、实施和使用成本较低的框架尤其重要。
提供一个简化的 Java 程序示例,演示如何将 Kafka 消息中间件、Kafka Streams 计算引擎、Drools 规则引擎、Redis 内存数据库和分布式数据库集成在一起。程序的主要功能是:
- 从 Kafka 中消费实时交易数据。
- 从 Redis 获取对应的风险标签,如果没有则从分布式数据库获取并更新到 Redis。
- 使用 Drools 规则引擎对交易数据和风险标签进行评估。
- 将评估结果发送回支付业务系统或记录下来。
示例图:
实时交易模块:接收交易数据 -> 获取风险标签(Redis)---> 调用规则引擎 —> 评估结果返回
↓ ↓ ↑
规则引擎模块:交易数据 + 风险标签 ---> 规则执行 -----------> 输出评估结果(通过/拒绝)
为了简化示例,我们将:
创建一个简单的 Kafka 生产者,向 transaction-topic
发送交易数据。
2. 生产测试数据
- 使用简单的交易数据结构和风险标签。
- 定义基本的 Drools 规则。
- 使用内存中的 H2 数据库模拟分布式数据库
-
项目结构和依赖
1. 项目结构
risk-control-demo/ ├── src/ │ ├── main/ │ │ ├── java/ │ │ │ └── com.example.riskcontrol/ │ │ │ ├── RiskControlApplication.java // 主应用程序 │ │ │ ├── Transaction.java // 交易数据模型 │ │ │ ├── RiskTag.java // 风险标签模型 │ │ │ ├── RiskEvaluator.java // 风险评估类 │ │ │ ├── RedisService.java // Redis 服务类 │ │ │ ├── DatabaseService.java // 数据库服务类 │ │ │ └── KafkaStreamsConfig.java // Kafka Streams 配置 │ │ └── resources/ │ │ ├── drools/ │ │ │ └── rules.drl // Drools 规则文件 │ │ └── application.properties // 应用程序配置 ├── pom.xml // Maven 项目配置
- 2. 依赖库(在
pom.xml
中) -
<dependencies> <!-- Kafka Streams --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>3.4.0</version> </dependency> <!-- Drools Core --> <dependency> <groupId>org.kie</groupId> <artifactId>kie-api</artifactId> <version>7.73.0.Final</version> </dependency> <dependency> <groupId>org.drools</groupId> <artifactId>drools-core</artifactId> <version>7.73.0.Final</version> </dependency> <dependency> <groupId>org.drools</groupId> <artifactId>drools-compiler</artifactId> <version>7.73.0.Final</version> </dependency> <!-- Redis Client (Jedis) --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>4.3.1</version> </dependency> <!-- H2 Database --> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <version>2.1.214</version> <scope>runtime</scope> </dependency> <!-- JSON Processing --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.14.0</version> </dependency> <!-- Logging --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.36</version> </dependency> </dependencies>
详细代码
1. Transaction.java(交易数据模型)
-
package com.example.riskcontrol; import java.io.Serializable; public class Transaction implements Serializable { private String transactionId; private String accountId; private double amount; private long timestamp; // Constructors, getters, setters, toString() public Transaction() { } public Transaction(String transactionId, String accountId, double amount, long timestamp) { this.transactionId = transactionId; this.accountId = accountId; this.amount = amount; this.timestamp = timestamp; } // Getters and Setters // toString() method }
2. RiskTag.java(风险标签模型)
-
package com.example.riskcontrol; import java.io.Serializable; public class RiskTag implements Serializable { private String accountId; private int riskLevel; // 1-低风险, 2-中风险, 3-高风险 // Constructors, getters, setters, toString() public RiskTag() { } public RiskTag(String accountId, int riskLevel) { this.accountId = accountId; this.riskLevel = riskLevel; } // Getters and Setters // toString() method }
3. RedisService.java(Redis 服务类)
-
package com.example.riskcontrol; import redis.clients.jedis.Jedis; public class RedisService { private Jedis jedis; public RedisService(String host, int port) { jedis = new Jedis(host, port); } public RiskTag getRiskTag(String accountId) { String riskLevelStr = jedis.get("risk:" + accountId); if (riskLevelStr != null) { int riskLevel = Integer.parseInt(riskLevelStr); return new RiskTag(accountId, riskLevel); } return null; } public void setRiskTag(RiskTag riskTag) { jedis.set("risk:" + riskTag.getAccountId(), String.valueOf(riskTag.getRiskLevel())); } public void close() { jedis.close(); } }
4. DatabaseService.java(数据库服务类)
-
package com.example.riskcontrol; import java.sql.*; public class DatabaseService { private Connection connection; public DatabaseService() throws SQLException { // 连接 H2 内存数据库 connection = DriverManager.getConnection("jdbc:h2:mem:testdb"); initializeDatabase(); } private void initializeDatabase() throws SQLException { Statement stmt = connection.createStatement(); // 创建风险标签表 String sql = "CREATE TABLE IF NOT EXISTS risk_tags (" + "account_id VARCHAR(255) PRIMARY KEY," + "risk_level INT" + ")"; stmt.executeUpdate(sql); // 插入示例数据 sql = "INSERT INTO risk_tags (account_id, risk_level) VALUES ('account123', 2)"; stmt.executeUpdate(sql); stmt.close(); } public RiskTag getRiskTag(String accountId) throws SQLException { String sql = "SELECT risk_level FROM risk_tags WHERE account_id = ?"; PreparedStatement pstmt = connection.prepareStatement(sql); pstmt.setString(1, accountId); ResultSet rs = pstmt.executeQuery(); if (rs.next()) { int riskLevel = rs.getInt("risk_level"); rs.close(); pstmt.close(); return new RiskTag(accountId, riskLevel); } else { rs.close(); pstmt.close(); return null; } } public void close() throws SQLException { connection.close(); } }
5. RiskEvaluator.java(风险评估类)
-
package com.example.riskcontrol; import org.kie.api.KieServices; import org.kie.api.runtime.KieContainer; import org.kie.api.runtime.KieSession; public class RiskEvaluator { private KieSession kieSession; public RiskEvaluator() { // 初始化 Drools KieServices kieServices = KieServices.Factory.get(); KieContainer kieContainer = kieServices.newKieClasspathContainer(); kieSession = kieContainer.newKieSession("ksession-rules"); } public boolean evaluate(Transaction transaction, RiskTag riskTag) { kieSession.insert(transaction); kieSession.insert(riskTag); int fired = kieSession.fireAllRules(); kieSession.dispose(); return fired > 0; } }
6. drools/rules.drl(Drools 规则文件)
-
package com.example.riskcontrol import com.example.riskcontrol.Transaction; import com.example.riskcontrol.RiskTag; rule "High Risk Transaction" when $transaction : Transaction( amount > 10000 ) $riskTag : RiskTag( riskLevel == 3 ) then System.out.println("High risk transaction detected: " + $transaction); end rule "Medium Risk Transaction" when $transaction : Transaction( amount > 5000 && amount <= 10000 ) $riskTag : RiskTag( riskLevel >= 2 ) then System.out.println("Medium risk transaction detected: " + $transaction); end rule "Low Risk Transaction" when $transaction : Transaction() $riskTag : RiskTag( riskLevel == 1 ) then System.out.println("Transaction passed: " + $transaction); end
7. KafkaStreamsConfig.java(Kafka Streams 配置)
-
package com.example.riskcontrol; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; import java.util.Properties; public class KafkaStreamsConfig { public static Properties getProperties() { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "risk-control-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); return props; } }
8. RiskControlApplication.java(主应用程序)
-
package com.example.riskcontrol; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; import java.sql.SQLException; public class RiskControlApplication { public static void main(String[] args) throws SQLException { // 初始化服务 RedisService redisService = new RedisService("localhost", 6379); DatabaseService databaseService = new DatabaseService(); RiskEvaluator riskEvaluator = new RiskEvaluator(); // 配置 Kafka Streams StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> sourceStream = builder.stream("transaction-topic"); // 处理流 sourceStream.foreach((key, value) -> { try { ObjectMapper objectMapper = new ObjectMapper(); Transaction transaction = objectMapper.readValue(value, Transaction.class); // 从 Redis 获取风险标签 RiskTag riskTag = redisService.getRiskTag(transaction.getAccountId()); if (riskTag == null) { // 如果 Redis 中没有,从数据库获取并更新到 Redis riskTag = databaseService.getRiskTag(transaction.getAccountId()); if (riskTag != null) { redisService.setRiskTag(riskTag); } else { // 如果数据库中也没有,设定默认风险标签 riskTag = new RiskTag(transaction.getAccountId(), 1); } } // 使用 Drools 进行风险评估 boolean isRisk = riskEvaluator.evaluate(transaction, riskTag); // 根据评估结果进行处理 if (isRisk) { System.out.println("Transaction " + transaction.getTransactionId() + " is risky. Action: Block"); // 发送阻止交易的消息或记录日志 } else { System.out.println("Transaction " + transaction.getTransactionId() + " is safe. Action: Approve"); // 发送通过交易的消息或记录日志 } } catch (Exception e) { e.printStackTrace(); } }); // 启动 Kafka Streams KafkaStreams streams = new KafkaStreams(builder.build(), KafkaStreamsConfig.getProperties()); streams.start(); // 添加关闭钩子 Runtime.getRuntime().addShutdownHook(new Thread(() -> { streams.close(); redisService.close(); try { databaseService.close(); } catch (SQLException e) { e.printStackTrace(); } })); } }
运行示例
1. 启动必要的服务
- Redis:确保 Redis 服务在本地的
6379
端口运行。 - Kafka:确保 Kafka 服务在本地的
9092
端口运行,并创建主题transaction-topic
。
package com.example.riskcontrol;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class TransactionProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
try {
ObjectMapper objectMapper = new ObjectMapper();
// 创建示例交易数据
Transaction transaction = new Transaction(
"tx1001", "account123", 12000.0, System.currentTimeMillis());
String transactionJson = objectMapper.writeValueAsString(transaction);
ProducerRecord<String, String> record = new ProducerRecord<>(
"transaction-topic", transaction.getTransactionId(), transactionJson);
producer.send(record);
System.out.println("Transaction sent: " + transactionJson);
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
. 运行应用程序
- 先运行
RiskControlApplication
,启动风控系统。 - 再运行
TransactionProducer
,发送交易数据。
4. 预期输出
风控系统将处理交易数据,使用 Drools 规则引擎进行评估,并根据规则打印评估结果。例如:
High risk transaction detected: Transaction{transactionId='tx1001', accountId='account123', amount=12000.0, timestamp=...}
Transaction tx1001 is risky. Action: Block
说明
- Kafka Streams:用于实时消费交易数据,并进行数据处理。
- Drools:规则引擎,用于评估交易的风险级别。
- Redis:作为缓存,存储风险标签,快速获取账户的风险级别。
- 分布式数据库(H2 数据库模拟):当 Redis 中没有风险标签时,从数据库获取,并更新到 Redis。
- 风险标签:简单地使用风险级别(1-低风险,2-中风险,3-高风险)来表示。
注意事项
- 异常处理:在实际应用中,需要更完善的异常处理机制,防止因异常导致程序崩溃。
- 多线程与并发:在高并发场景下,需要考虑线程安全和性能优化。
- 资源管理:确保所有的资源(如数据库连接、Redis 连接、Kafka Streams)在程序结束时正确关闭。
- 配置管理:将硬编码的配置(如主机地址、端口、主题名)提取到配置文件中,便于管理和修改。
5、系统整体各个模块的调度关系流程
以下是系统各模块之间的交互流程,详细说明了调度关系:
-
交易数据的接收与预处理:
- 支付业务系统将实时交易数据通过消息队列模块(Kafka)或接口与通信模块(API/gRPC)发送到实时交易数据处理模块。
- 实时交易数据处理模块接收数据后,进行数据预处理,如格式验证和完整性检查。
-
风险标签的获取:
- 实时交易数据处理模块需要获取交易涉及的账户或用户的风险标签。
- 首先从**数据存储与缓存模块(Redis)**中查询风险标签。
- 如果缓存中没有对应的风险标签,则从分布式数据库中读取,并更新到缓存。
-
风险评估:
- 实时交易数据处理模块将交易数据和风险标签一起传递给规则引擎模块。
- 规则引擎模块根据预定义的业务规则,对交易进行风险评估,生成评估结果(如通过、拒绝、需人工审核)。
-
评估结果的返回:
- 规则引擎模块将评估结果返回给实时交易数据处理模块。
- 实时交易数据处理模块通过接口与通信模块将评估结果反馈给支付业务系统,执行相应的业务操作。
-
风险标签的批量更新:
- 批量风险标签处理模块定期执行,获取历史数据进行风险标签的重新计算。
- 计算出的风险标签存储在分布式数据库中,并同步更新到Redis 缓存。
-
系统监控与安全:
- 监控与运维模块持续监控各模块的状态和性能,收集日志信息,设置报警机制。
- 安全与合规模块确保数据传输和存储的安全性,对各模块的访问进行权限控制,满足合规要求。
[支付业务系统]
|
v
1. 发送交易数据
|
v
[消息队列模块(Kafka)/接口与通信模块(API/gRPC)]
|
v
[实时交易数据处理模块]
|
+--> 2. 从缓存获取风险标签
| |
| v
| [数据存储与缓存模块(Redis)]
| |
| 若未命中
| v
| 从数据库获取并更新缓存
| |
| [分布式数据库]
|
+--> 3. 调用规则引擎模块
| |
| v
| [规则引擎模块]
| |
| 执行风险评估
| |
| 返回评估结果
|
+--> 4. 返回评估结果给支付业务系统
| |
v v
[接口与通信模块] <---> [支付业务系统]
总结
上述示例提供了一个基本的程序框架,演示了如何将 Kafka、Kafka Streams、Drools、Redis 和分布式数据库集成在一起,完成实时风控的基本功能。在实际项目中,需要根据具体的业务需求和技术环境,对程序进行扩展和优化。