Java 中的扇出扇入模式:最大化并发以实现高效数据处理
也称为
- 散布收集
扇出/扇入设计模式的意图
Java 中的扇出/扇入设计模式旨在通过将任务分解成多个可并行处理的子任务(扇出)来提高并发性和优化处理时间,然后将这些子任务的结果合并成一个结果(扇入)。
扇出/扇入模式的详细解释以及现实世界中的例子
现实世界的例子
Java 中扇出/扇入模式的一个现实世界的例子是 UberEats 或 DoorDash 等外卖服务。当客户下单时,服务(扇出)会向不同的餐厅发送独立的任务,以准备各种商品。每家餐厅独立工作以准备其订单的一部分。一旦所有餐厅都完成了其任务,送货服务(扇入)将来自不同餐厅的商品汇总成一个订单,确保所有商品都一起送到客户手中。这种并行处理提高了效率并确保了及时送货。
通俗地说
扇出/扇入模式将任务分布到多个并发进程或线程中,然后聚合结果。
维基百科说
在消息导向中间件中,扇出模式通过并行将消息传递到一个或多个目的地来模拟信息交换,而无需等待响应。这允许一个进程同时将任务分配给多个接收方。
另一方面,扇入概念通常指的是多个输入的聚合。在数字电子学中,它描述了逻辑门可以处理的输入数量。结合这些概念,软件工程中的扇出/扇入模式涉及分配任务(扇出)然后聚合结果(扇入)。
Java 中扇出/扇入模式的编程示例
提供的实现涉及一个数字列表,目标是将它们平方并聚合结果。FanOutFanIn
类接收数字列表作为 SquareNumberRequest
对象和一个 Consumer
实例,该实例在请求完成时收集平方结果。每个 SquareNumberRequest
都用随机延迟对其数字进行平方运算,模拟一个在不可预测的时间完成的长时间运行的进程。Consumer
实例在各个 SquareNumberRequest
对象在不同时间变得可用时从它们收集结果。
以下是用 Java 编写的 FanOutFanIn
类,它通过异步分配请求来演示扇出/扇入模式。
public class FanOutFanIn {
public static Long fanOutFanIn(final List<SquareNumberRequest> requests, final Consumer consumer) {
ExecutorService service = Executors.newFixedThreadPool(requests.size());
// fanning out
List<CompletableFuture<Void>> futures = requests
.stream()
.map(request -> CompletableFuture.runAsync(() -> request.delayedSquaring(consumer), service))
.collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
return consumer.getSumOfSquaredNumbers().get();
}
}
Consumer
用作回调类,将在请求完成后调用。这将聚合来自所有请求的结果。
public class Consumer {
private final AtomicLong sumOfSquaredNumbers;
Consumer(Long init) {
sumOfSquaredNumbers = new AtomicLong(init);
}
public Long add(final Long num) {
return sumOfSquaredNumbers.addAndGet(num);
}
}
请求表示为 SquareNumberRequest
,它使用随机延迟对数字进行平方运算,并在平方完成后调用 Consumer
。
public class SquareNumberRequest {
private final Long number;
public void delayedSquaring(final Consumer consumer) {
var minTimeOut = 5000L;
SecureRandom secureRandom = new SecureRandom();
var randomTimeOut = secureRandom.nextInt(2000);
try {
// this will make the thread sleep from 5-7s.
Thread.sleep(minTimeOut + randomTimeOut);
} catch (InterruptedException e) {
LOGGER.error("Exception while sleep ", e);
Thread.currentThread().interrupt();
} finally {
consumer.add(number * number);
}
}
}
以下是包含 main 方法以驱动示例的 App 类。
public static void main(String[] args) {
final List<Long> numbers = Arrays.asList(1L, 3L, 4L, 7L, 8L);
LOGGER.info("Numbers to be squared and get sum --> {}", numbers);
final List<SquareNumberRequest> requests =
numbers.stream().map(SquareNumberRequest::new).toList();
var consumer = new Consumer(0L);
// Pass the request and the consumer to fanOutFanIn or sometimes referred as Orchestrator
// function
final Long sumOfSquaredNumbers = FanOutFanIn.fanOutFanIn(requests, consumer);
LOGGER.info("Sum of all squared numbers --> {}", sumOfSquaredNumbers);
}
运行示例会生成以下控制台输出。
06:52:04.622 [main] INFO com.iluwatar.fanout.fanin.App -- Numbers to be squared and get sum --> [1, 3, 4, 7, 8]
06:52:11.465 [main] INFO com.iluwatar.fanout.fanin.App -- Sum of all squared numbers --> 139
何时在 Java 中使用扇出/扇入模式
Java 中的扇出/扇入设计模式适用于可以分解并并行执行任务的场景,特别适用于数据处理、批处理以及需要从各种来源聚合结果的情况。
扇出/扇入模式 Java 教程
Java 中扇出/扇入模式的现实世界应用
- Java 中的扇出/扇入模式广泛应用于大型数据处理应用程序中。
- 在将响应发送给客户之前需要从多个来源进行聚合的服务,例如分布式缓存或负载均衡系统。
扇出/扇入模式的优点和权衡
优点
- 通过并行处理提高性能。
- 提高系统响应能力。
- 高效利用多核处理器架构。
权衡
- 错误处理的复杂性增加。
- 由于任务同步和结果聚合,可能会导致开销增加。
- 依赖于底层基础设施支持并发执行的能力。
相关的 Java 设计模式
- MapReduce:与扇出/扇入类似,MapReduce 也涉及将任务分布到多个工作节点(映射)和聚合结果(归约),这对于处理大型数据集特别有用。
- 命令:命令模式有助于将发送方和接收方解耦,类似于扇出/扇入模式如何将任务提交与任务处理解耦。
- 生产者-消费者:通过组织任务执行来协同工作,生产者分配由多个消费者处理的任务,然后合并结果,从而提高数据处理的吞吐量和效率。