微服务设计模式 - 事件溯源模式(Event Sourcing Pattern)
微服务设计模式 - 事件溯源模式(Event Sourcing Pattern)
定义
事件溯源(Event Sourcing)是一种将所有状态更改保存为一系列事件的设计模式。每次系统状态发生变化时,都会生成一个事件,这些事件在事件存储库(Event Store)中按照时间顺序保存。通过重放这些事件,可以重建对象的当前状态。
结构
事件溯源模式的关键组件包括:
- 命令(Command): 修改系统状态的操作。
- 事件(Event): 描述系统状态变化的记录。
- 事件存储库(Event Store): 存储所有事件的数据库。
- 查询(Query): 获取系统或对象当前状态的操作,通过重放事件来构建当前状态。
命令 (Command)
|
v
事件 (Event)
|
v
事件存储库 (Event Store)
|
v
重放事件 (Replaying Events)
|
v
当前状态 (Current State)
工作方式
-
接收命令: 当系统接收一个命令时,比如"创建订单"。
-
生成事件: 根据命令生成一个事件,如"订单已创建或者订单已取消"。
-
存储事件: 将生成的事件存储在事件存储库中。
-
重放事件: 当需要查询当前状态时,通过重放所有相关事件来构建对象的当前状态。
好处
- 可追溯性: 每个状态变化都可以通过事件追踪和重现,这对于调试、审计和恢复非常有用。
- 可靠性: 事件作为持久记录,可以重放以恢复任何时间点的系统状态。
- 解耦: 命令和查询的分离使得系统能针对不同需求进行优化(这个通常与CQRS模式结合)。
应用场景
- 订单管理系统: 在电子商务系统中,订单的生命周期从创建到取消都可以通过事件进行跟踪。
- 金融交易系统: 金融交易的每个步骤都可以完整记录,确保数据的准确性和可追溯性。
示例代码片段
假设我们有一个订单服务,通过事件溯源记录和重建订单状态。
// 事件类
public class OrderPlacedEvent {
private String orderId;
private String product;
private int quantity;
// getters and setters
}
// 事件存储库接口
public interface EventStore {
void saveEvent(Event event);
List<Event> getEvents(String aggregateId);
}
// 内存事件存储库的简单实现
public class InMemoryEventStore implements EventStore {
private Map<String, List<Event>> store = new HashMap<>();
@Override
public void saveEvent(Event event) {
store.computeIfAbsent(event.getAggregateId(), k -> new ArrayList<>()).add(event);
}
@Override
public List<Event> getEvents(String aggregateId) {
return store.getOrDefault(aggregateId, Collections.emptyList());
}
}
// 订单聚合根
public class Order {
private String orderId;
private List<Event> changes = new ArrayList<>();
public void placeOrder(String productId, int quantity) {
applyChange(new OrderPlacedEvent(orderId, productId, quantity));
}
private void applyChange(Event event) {
// apply event to current state
changes.add(event);
}
public List<Event> getUncommittedChanges() {
return changes;
}
}
// 订单服务
public class OrderService {
private EventStore eventStore;
public OrderService(EventStore eventStore) {
this.eventStore = eventStore;
}
public void placeOrder(String orderId, String productId, int quantity) {
Order order = new Order(orderId);
order.placeOrder(productId, quantity);
eventStore.saveEvent(new OrderPlacedEvent(orderId, productId, quantity));
}
public Order getOrder(String orderId) {
List<Event> events = eventStore.getEvents(orderId);
Order order = new Order(orderId);
events.forEach(event -> order.applyChange(event));
return order;
}
}
Axon框架
介绍
Axon框架是一个专注于实现CQRS(Command Query Responsibility Segregation,命令查询责任分离)和事件溯源(Event Sourcing)模式的Java框架。它简化了复杂分布式系统的实现,通过结构化的方式处理事件和命令,使系统更易于扩展、维护和调试。该框架高度模块化,可以与Spring Boot无缝集成,也可以使用其他依赖注入框架或独立使用。
概念
1. 聚合(Aggregate)
在Axon中,聚合是应用程序逻辑的核心。它是处理命令和定义领域逻辑的实体或对象集合。聚合确保所有的修改操作以一致的方式应用。
2. 命令(Command)
命令是对聚合执行操作的请求。命令是指令性的,它表示希望系统发生某些变化。命令是由用户或系统其他部分生成,并由聚合处理。
3. 事件(Event)
事件是系统中已经发生的事实。它们是对命令执行结果的具体描述。事件由事件溯源仓储(Event Sourcing Repository)存储,可以用于重建聚合的状态。
4. 命令处理器(Command Handler)
命令处理器是处理命令并触发事件的组件。在Axon中,命令处理器通常是定义在聚合中的。
5. 事件处理器(Event Handler)
事件处理器用于处理产生的事件,并更新读模型或执行其他逻辑。
6. 命令网关(Command Gateway)
命令网关提供了一个简化发送命令的API,处理命令的路由、序列化和分发。
7. 事件存储(Event Store)
在事件驱动架构中,事件存储是一个核心组件,它记录所有由系统生成的事件,而不是仅仅存储应用程序的当前状态。事件存储的主要作用包括:
- 记录所有事件: 每次系统状态发生变化时,都会生成一个事件,这些事件被按时间顺序存储下来。事件存储使得我们能够追溯和重放这些事件,从而重建任何时间点的系统状态。
- 事件回放: 可以重放所有事件来重建聚合的当前状态,这在需要恢复或重建数据时非常有用。
- 审计和调试: 提供了系统行为的完整日志,可以用于审计和调试。
在Axon框架中,事件存储可以配置使用各种存储引擎,如关系型数据库(通过JDBC)、MongoDB、Axon Server等。Axon框架提供了灵活的方式来配置和使用事件存储。接下来,本来将在Spring Boot应用程序中配置Axon框架,并使用H2嵌入式数据库作为事件存储。
注解Annotation
以下是Axon框架中一些常用的重要注解及其用途:
1. @Aggregate
用于标记一个聚合根。这告诉Axon这是一个聚合,可以处理命令并生成事件。
@Aggregate
public class OrderAggregate {
// Fields, Command Handlers, Event Sourcing Handlers
}
2. @CommandHandler
用于标记处理命令的方法或构造函数。该注解可以应用于聚合内部的方法或构造函数。
@CommandHandler
public void handle(CreateOrderCommand command) {
// Command handling logic
}
3. @EventSourcingHandler
用于标记处理事件且根据事件更新聚合状态的方法。事件溯源处理器确保每个事件都能正确应用到聚合状态。
@EventSourcingHandler
public void on(OrderCreatedEvent event) {
// Update aggregate state based on event
}
4. @QueryHandler
用于标记处理查询的方法。在CQRS模式下,查询处理器负责处理查询请求,并从读模型或事件溯源重建模型中获取数据。
@QueryHandler
public Order handle(OrderQuery query) {
// Query handling logic to retrieve data
}
5. @Saga
Saga用于管理长时间运行的业务流程和跨多个聚合的事务。Saga负责协调和维护多个步骤之间的状态。
@Saga
public class OrderManagementSaga {
// Fields, Event Handlers, Logic for managing saga state
}
在Spring Boot中的应用
Spring Boot可以通过事件溯源模式增强微服务架构的可扩展性和可靠性。我们可以使用Axon Framework
,一个专门用于CQRS和事件溯源的框架。
项目结构
src
|-- main
|-- java
|-- com.example.eventdriven
|-- aggregate
|-- OrderAggregate.java
|-- command
|-- CreateOrderCommand.java
|-- event
|-- OrderCreatedEvent.java
|-- OrderCancelledEvent.java
|-- query
|-- OrderQueryHandler.java
|-- Order.java
|-- OrderQuery.java
|-- controller
|-- OrderController.java
|-- EventDrivenApplication.java
|-- resources
|-- application.yml
项目依赖
pom.xml
<dependencies>
<!-- Spring Boot dependencies -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- Axon Framework dependencies -->
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring-boot-starter</artifactId>
<version>4.5.5</version>
</dependency>
<!-- Spring Data JPA for persistence -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- H2 Database for in-memory storage -->
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
项目配置
在application.yml
,配置Axon框架和H2数据库。
# This configuration tells Axon to set up an event store with snapshot thresholds and default event handling processors. The storage engine (H2 database, in this case) is configured through Spring Data JPA settings
server:
port: 8080
spring:
datasource:
url: jdbc:h2:mem:testdb
driver-class-name: org.h2.Driver
username: sa
password: password
h2:
console:
enabled: true
jpa:
hibernate:
ddl-auto: update
show-sql: true
axon:
eventhandling:
processors:
default:
mode: subscribing
eventstore:
snapshot-threshold: 100
主要源代码
1. Order Aggregate
// Define the Aggregate that will handle commands and produce events.
// File: src/main/java/com/example/eventdriven/aggregate/OrderAggregate.java
package com.example.eventdriven.aggregate;
import com.example.eventdriven.command.CreateOrderCommand;
import com.example.eventdriven.event.OrderCreatedEvent;
import com.example.eventdriven.event.OrderCancelledEvent;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.modelling.command.AggregateIdentifier;
import org.axonframework.spring.stereotype.Aggregate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.axonframework.modelling.command.AggregateLifecycle.apply;
@Aggregate
public class OrderAggregate {
private static final Logger logger = LoggerFactory.getLogger(OrderAggregate.class);
@AggregateIdentifier
private String orderId;
private String product;
private int quantity;
private boolean active;
public OrderAggregate() {
// Default constructor needed by Axon
}
@CommandHandler
public OrderAggregate(CreateOrderCommand command) {
// Validate command logic here, e.g., checking for null values or constraints
apply(new OrderCreatedEvent(command.getOrderId(), command.getProduct(), command.getQuantity()));
}
// Event Sourcing Handler for OrderCreatedEvent
@EventSourcingHandler
public void on(OrderCreatedEvent event) {
this.orderId = event.getOrderId();
this.product = event.getProduct();
this.quantity = event.getQuantity();
this.active = true;
logger.info("Order created: {}", event);
}
// Event Sourcing Handler for OrderCancelledEvent
@EventSourcingHandler
public void on(OrderCancelledEvent event) {
this.active = false;
logger.info("Order cancelled: {}", event);
}
}
在Axon框架中,apply
函数是一个核心函数,用于在聚合内部记录和发布事件。通过调用apply
方法,可以将一个事件记录下来,并确保相应的事件处理逻辑被触发。apply
函数的主要功能包括:
- 触发事件处理器(Event Sourcing Handler):
apply
方法触发标注了@EventSourcingHandler
注解的方法,从而更新聚合的内部状态。- 这确保每个事件相关的状态变更都被正确应用到聚合中。
- 发布事件(Publish Event):
- 事件会被发布到事件总线(Event Bus),从而通知所有对该事件感兴趣的组件。
- 其他聚合、查询模型或外部系统可以订阅并响应这些事件。
2. Order Command
// Define a command class that carries the data necessary to create a new order.
// File: src/main/java/com/example/eventdriven/command/CreateOrderCommand.java
package com.example.eventdriven.command;
import org.axonframework.modelling.command.TargetAggregateIdentifier;
public class CreateOrderCommand {
@TargetAggregateIdentifier
private String orderId;
private String product;
private int quantity;
public CreateOrderCommand(String orderId, String product, int quantity) {
this.orderId = orderId;
this.product = product;
this.quantity = quantity;
}
// Getters and setters
public String getOrderId() {
return orderId;
}
public String getProduct() {
return product;
}
public int getQuantity() {
return quantity;
}
}
3. Order Events
// Define an event class that represents the creation of an order.
// File: src/main/java/com/example/eventdriven/event/OrderCreatedEvent.java
package com.example.eventdriven.event;
public class OrderCreatedEvent {
private String orderId;
private String product;
private int quantity;
public OrderCreatedEvent(String orderId, String product, int quantity) {
this.orderId = orderId;
this.product = product;
this.quantity = quantity;
}
// Getters and setters
public String getOrderId() {
return orderId;
}
public String getProduct() {
return product;
}
public int getQuantity() {
return quantity;
}
@Override
public String toString() {
return "OrderCreatedEvent{" +
"orderId='" + orderId + '\'' +
", product='" + product + '\'' +
", quantity=" + quantity +
'}';
}
}
// Define an event class that represents the Cancellation of an order.
// File: src/main/java/com/example/eventdriven/event/OrderCancelledEvent.java
package com.example.eventdriven.event;
public class OrderCancelledEvent {
private String orderId;
public OrderCancelledEvent(String orderId) {
this.orderId = orderId;
}
// Getters and setters
public String getOrderId() {
return orderId;
}
@Override
public String toString() {
return "OrderCancelledEvent{" +
"orderId='" + orderId + '\'' +
'}';
}
}
4. Order Query Handling
// Define a query handler to handle queries related to orders.
// File: src/main/java/com/example/eventdriven/query/OrderQueryHandler.java
package com.example.eventdriven.query;
import com.example.eventdriven.event.OrderCreatedEvent;
import com.example.eventdriven.event.OrderCancelledEvent;
import org.axonframework.eventsourcing.eventstore.EventStore;
import org.axonframework.queryhandling.QueryHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class OrderQueryHandler {
@Autowired
private EventStore eventStore;
@QueryHandler
public Order handle(OrderQuery query) {
List<?> events = eventStore.readEvents(query.getOrderId()).asStream().toList();
Order order = new Order();
events.forEach(event -> {
if (event instanceof OrderCreatedEvent) {
order.apply((OrderCreatedEvent) event);
} else if (event instanceof OrderCancelledEvent) {
order.apply((OrderCancelledEvent) event);
}
// Handle other event types similarly
});
return order;
}
}
// File: src/main/java/com/example/eventdriven/query/Order.java
package com.example.eventdriven.query;
import com.example.eventdriven.event.OrderCreatedEvent;
import com.example.eventdriven.event.OrderCancelledEvent;
public class Order {
private String orderId;
private String product;
private int quantity;
private boolean active;
public void apply(OrderCreatedEvent event) {
this.orderId = event.getOrderId();
this.product = event.getProduct();
this.quantity = event.getQuantity();
this.active = true;
}
public void apply(OrderCancelledEvent event) {
this.active = false;
}
// Getters and other methods
public String getOrderId() {
return orderId;
}
public String getProduct() {
return product;
}
public int getQuantity() {
return quantity;
}
public boolean isActive() {
return active;
}
@Override
public String toString() {
return "Order{" +
"orderId='" + orderId + '\'' +
", product='" + product + '\'' +
", quantity=" + quantity +
", active=" + active +
'}';
}
}
// File: src/main/java/com/example/eventdriven/query/OrderQuery.java
package com.example.eventdriven.query;
public class OrderQuery {
private String orderId;
public OrderQuery(String orderId) {
this.orderId = orderId;
}
public String getOrderId() {
return orderId;
}
}
5. Order Controller
// Create a REST controller to handle HTTP requests.
// File: src/main/java/com/example/eventdriven/controller/OrderController.java
package com.example.eventdriven.controller;
import com.example.eventdriven.command.CreateOrderCommand;
import com.example.eventdriven.query.OrderQuery;
import com.example.eventdriven.query.Order;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.queryhandling.QueryGateway;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@RestController
@RequestMapping("/orders")
public class OrderController {
@Autowired
private CommandGateway commandGateway;
@Autowired
private QueryGateway queryGateway;
@PostMapping
public String createOrder(@RequestParam String product, @RequestParam int quantity) {
String orderId = UUID.randomUUID().toString();
CreateOrderCommand command = new CreateOrderCommand(orderId, product, quantity);
commandGateway.sendAndWait(command);
return "Order created with ID: " + orderId;
}
@GetMapping("/{orderId}")
public CompletableFuture<Order> getOrder(@PathVariable String orderId) {
return queryGateway.query(new OrderQuery(orderId), Order.class);
}
}
6. Application Entry Point
// Define the main application class.
// File: src/main/java/com/example/eventdriven/EventDrivenApplication.java
package com.example.eventdriven;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class EventDrivenApplication {
public static void main(String[] args) {
SpringApplication.run(EventDrivenApplication.class, args);
}
}
总结
事件溯源模式通过将所有状态变化都记录为事件,实现了系统状态的可追溯性和可恢复性。这种模式在处理复杂状态变化和高可用性需求的系统(例如订单管理系统、金融交易系统)特别有用。在Spring Boot中,我们可以借助Axon Framework来实现事件溯源模式,从而提升微服务架构的扩展性和可靠性。