Java 中的事件溯源模式:为健壮系统构建不可变的历史记录
也称为
- 事件日志记录
- 事件流
事件溯源设计模式的意图
事件溯源是一种设计模式,它主张将状态更改存储为一系列事件。与其更新数据库中的记录,不如将所有更改存储为独立的事件,这些事件在重播时可以重新创建应用程序在任何时间点的状态。
事件溯源模式的详细解释,并附带实际示例
现实世界中的例子
考虑一个跟踪用户帐户所有交易的银行应用程序。在这个系统中,每次存款、取款和转账都会被记录为事件日志中的独立事件。它不是仅仅更新当前的账户余额,而是将每笔交易都存储为一个离散事件。这种方法允许银行维护所有帐户活动的完整且不可变的历史记录。如果出现差异,银行可以重播事件序列以在任何时间点重建帐户状态。这提供了可靠的审计跟踪,便于调试,并支持交易回滚和历史数据分析等功能。
通俗地说
事件溯源将所有状态更改记录为一系列不可变事件,以确保可靠的状态重建和可审计性。
事件溯源模式定义了一种处理数据操作的方法,该方法由一系列事件驱动,每个事件都记录在仅追加存储中。应用程序代码发送一系列事件,这些事件以命令式的方式描述已在数据上执行的每个操作,并将它们持久化到事件存储中。每个事件表示对数据的更改集(例如,将商品添加到订单中)。
Java 中事件溯源模式的编程示例
在编程示例中,我们将在银行账户之间转账一些资金。
Event
类管理一个事件队列,并控制异步处理的线程操作。每个事件都可以看作是影响系统状态的状态更改。
public class Event {
private static final Event INSTANCE = new Event();
private static final int MAX_PENDING = 16;
private int headIndex;
private int tailIndex;
private volatile Thread updateThread = null;
private final EventMessage[] pendingEvents = new EventMessage[MAX_PENDING];
Event() {}
public static Event getInstance() {
return INSTANCE;
}
}
triggerEvent
方法是创建事件的地方。每次触发事件时,都会创建事件并将其添加到队列中。此事件包含状态更改的详细信息。
public void triggerEvent(EventMessage eventMessage) {
init();
for(var i = headIndex; i != tailIndex; i = (i + 1) % MAX_PENDING) {
var pendingEvent = getPendingEvents()[i];
if(pendingEvent.equals(eventMessage)) {
return;
}
}
getPendingEvents()[tailIndex] = eventMessage;
tailIndex = (tailIndex + 1) % MAX_PENDING;
}
init
和 startThread
方法确保线程正确初始化并运行。stopService
方法用于在不再需要时停止线程。这些方法管理用于处理事件的线程的生命周期。
public synchronized void stopService() throws InterruptedException {
if(updateThread != null) {
updateThread.interrupt();
updateThread.join();
updateThread = null;
}
}
public synchronized boolean isServiceRunning() {
return updateThread != null && updateThread.isAlive();
}
public void init() {
if(updateThread == null) {
updateThread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
update();
}
});
startThread();
}
}
private synchronized void startThread() {
if (!updateThread.isAlive()) {
updateThread.start();
headIndex = 0;
tailIndex = 0;
}
}
示例由 App
类及其 main
方法驱动。
@Slf4j
public class App {
public static final int ACCOUNT_OF_DAENERYS = 1;
public static final int ACCOUNT_OF_JON = 2;
public static void main(String[] args) {
var eventProcessor = new DomainEventProcessor(new JsonFileJournal());
LOGGER.info("Running the system first time............");
eventProcessor.reset();
LOGGER.info("Creating the accounts............");
eventProcessor.process(new AccountCreateEvent(
0, new Date().getTime(), ACCOUNT_OF_DAENERYS, "Daenerys Targaryen"));
eventProcessor.process(new AccountCreateEvent(
1, new Date().getTime(), ACCOUNT_OF_JON, "Jon Snow"));
LOGGER.info("Do some money operations............");
eventProcessor.process(new MoneyDepositEvent(
2, new Date().getTime(), ACCOUNT_OF_DAENERYS, new BigDecimal("100000")));
eventProcessor.process(new MoneyDepositEvent(
3, new Date().getTime(), ACCOUNT_OF_JON, new BigDecimal("100")));
eventProcessor.process(new MoneyTransferEvent(
4, new Date().getTime(), new BigDecimal("10000"), ACCOUNT_OF_DAENERYS,
ACCOUNT_OF_JON));
LOGGER.info("...............State:............");
LOGGER.info(AccountAggregate.getAccount(ACCOUNT_OF_DAENERYS).toString());
LOGGER.info(AccountAggregate.getAccount(ACCOUNT_OF_JON).toString());
LOGGER.info("At that point system had a shut down, state in memory is cleared............");
AccountAggregate.resetState();
LOGGER.info("Recover the system by the events in journal file............");
eventProcessor = new DomainEventProcessor(new JsonFileJournal());
eventProcessor.recover();
LOGGER.info("...............Recovered State:............");
LOGGER.info(AccountAggregate.getAccount(ACCOUNT_OF_DAENERYS).toString());
LOGGER.info(AccountAggregate.getAccount(ACCOUNT_OF_JON).toString());
}
}
运行示例会产生以下控制台输出。
22:40:47.982 [main] INFO com.iluwatar.event.sourcing.app.App -- Running the system first time............
22:40:47.984 [main] INFO com.iluwatar.event.sourcing.app.App -- Creating the accounts............
22:40:47.985 [main] INFO com.iluwatar.event.sourcing.domain.Account -- Some external api for only realtime execution could be called here.
22:40:48.089 [main] INFO com.iluwatar.event.sourcing.domain.Account -- Some external api for only realtime execution could be called here.
22:40:48.090 [main] INFO com.iluwatar.event.sourcing.app.App -- Do some money operations............
22:40:48.090 [main] INFO com.iluwatar.event.sourcing.domain.Account -- Some external api for only realtime execution could be called here.
22:40:48.095 [main] INFO com.iluwatar.event.sourcing.domain.Account -- Some external api for only realtime execution could be called here.
22:40:48.099 [main] INFO com.iluwatar.event.sourcing.domain.Account -- Some external api for only realtime execution could be called here.
22:40:48.099 [main] INFO com.iluwatar.event.sourcing.domain.Account -- Some external api for only realtime execution could be called here.
22:40:48.101 [main] INFO com.iluwatar.event.sourcing.app.App -- ...............State:............
22:40:48.104 [main] INFO com.iluwatar.event.sourcing.app.App -- Account{accountNo=1, owner='Daenerys Targaryen', money=90000}
22:40:48.104 [main] INFO com.iluwatar.event.sourcing.app.App -- Account{accountNo=2, owner='Jon Snow', money=10100}
22:40:48.104 [main] INFO com.iluwatar.event.sourcing.app.App -- At that point system had a shut down, state in memory is cleared............
22:40:48.104 [main] INFO com.iluwatar.event.sourcing.app.App -- Recover the system by the events in journal file............
22:40:48.124 [main] INFO com.iluwatar.event.sourcing.app.App -- ...............Recovered State:............
22:40:48.124 [main] INFO com.iluwatar.event.sourcing.app.App -- Account{accountNo=1, owner='Daenerys Targaryen', money=90000}
22:40:48.124 [main] INFO com.iluwatar.event.sourcing.app.App -- Account{accountNo=2, owner='Jon Snow', money=10100}
在这个示例中,可以通过重播队列中的事件来在任何时间点重建系统的状态。这是事件溯源模式的一个关键特性。
何时在 Java 中使用事件溯源模式
- 在需要完整审计跟踪和历史更改的系统中。
- 在应用程序状态从一系列更改中推导出来的复杂领域中。
- 对于能够从高可用性和可扩展性中受益的系统,因为事件溯源自然适合分布式系统。
Java 中事件溯源模式的实际应用
- 金融系统,用于跟踪交易和帐户余额随时间的变化。
- 电子商务应用程序,用于订单和库存管理。
- 实时数据处理系统,其中事件一致性和可重播性至关重要。
- LMAX 架构
事件溯源模式的优点和缺点
好处
- 可审计性:对状态的每一次更改都会被记录,从而实现全面的审计。
- 可重播性:可以重新处理事件以重新创建历史状态或转移到新状态。
- 可扩展性:可以异步和并行处理事件。
权衡
- 复杂性:实施和维护事件溯源系统可能会引入额外的复杂性。
- 事件存储大小:存储每个状态更改会导致大量数据。
- 事件版本控制:事件结构随时间的变化需要仔细处理,以确保系统完整性。
相关的 Java 设计模式
- 命令查询职责分离 (CQRS):通常与事件溯源一起使用,以分离读写职责,从而提高性能和可扩展性。
- 快照:用于优化事件溯源系统,通过定期保存当前状态以避免重播长时间的事件序列。