Java 中的生产者-消费者模式:简化生产和消费流程
也称为
- 有界缓冲区
- 消费者-生产者
生产者-消费者设计模式的意图
生产者-消费者设计模式是并发 Java 应用程序的关键组成部分,用于解耦生产和消费数据的任务,使生产者能够并发地生成数据,而消费者能够并发地处理数据,而彼此之间没有直接依赖关系。
生产者-消费者模式的详细解释以及现实世界中的例子
现实世界中的例子
在典型的汽车制造场景中,生产者-消费者模式有助于同步操作,确保高效的组装和安装流程。想象一个汽车制造厂,生产的各个阶段都在进行。 “生产者”可以是组装汽车发动机的车间,而“消费者”可以是将发动机安装到车身的车间。发动机组装完成后,会放在传送带上(充当缓冲区)。安装车间从传送带上取下发动机并安装到汽车上。这使发动机组装和发动机安装流程能够独立运行,传送带管理这两个阶段之间的同步。如果组装车间生产发动机的速度快于安装车间安装发动机的速度,多余的发动机将暂时存储在传送带上。相反,如果安装车间需要发动机,但组装车间暂时停止工作,它仍然可以处理传送带上现有的发动机。
简单来说
它提供了一种在以不同速度运行的多个循环之间共享数据的方法。
维基百科说
Dijkstra 阐述了这种情况:“我们考虑两个进程,分别称为‘生产者’和‘消费者’。生产者是一个循环进程,它产生一定数量的信息,这些信息需要由消费者处理。消费者也是一个循环进程,它需要处理生产者产生的下一部分信息。我们假设这两个进程通过一个具有无限容量的缓冲区连接以实现此目的。”
Java 中生产者-消费者模式的编程示例
考虑一个物品的制造流程,生产者需要在制造管道已满时暂停生产,而消费者需要在制造管道为空时暂停消费。我们可以将生产和消费过程分开,它们协同工作并在不同的时间暂停。
在这个 Java 示例中,生产者
生成存储在 ItemQueue
中的项目,展示了高效的线程管理和数据同步,这些对于高性能的 Java 应用程序至关重要。
我们有一个简单的 Item
记录。它们存储在 ItemQueue
中。
public record Item(String producer, int id) {}
public class ItemQueue {
private final BlockingQueue<Item> queue;
public ItemQueue() {
queue = new LinkedBlockingQueue<>(5);
}
public void put(Item item) throws InterruptedException {
queue.put(item);
}
public Item take() throws InterruptedException {
return queue.take();
}
}
生产者
将项目生产到 ItemQueue
中。
public class Producer {
private static final SecureRandom RANDOM = new SecureRandom();
private final ItemQueue queue;
private final String name;
private int itemId;
public Producer(String name, ItemQueue queue) {
this.name = name;
this.queue = queue;
}
public void produce() throws InterruptedException {
var item = new Item(name, itemId++);
queue.put(item);
Thread.sleep(RANDOM.nextInt(2000));
}
}
然后,我们有 消费者
类,它从 ItemQueue
中获取项目。
@Slf4j
public class Consumer {
private final ItemQueue queue;
private final String name;
public Consumer(String name, ItemQueue queue) {
this.name = name;
this.queue = queue;
}
public void consume() throws InterruptedException {
var item = queue.take();
LOGGER.info("Consumer [{}] consume item [{}] produced by [{}]", name,
item.getId(), item.getProducer());
}
}
现在,在制造管道的过程中,我们可以从 生产者
和 消费者
类实例化对象,因为它们从队列中生产和消费项目。
public static void main(String[] args) {
var queue = new ItemQueue();
var executorService = Executors.newFixedThreadPool(5);
for (var i = 0; i < 2; i++) {
final var producer = new Producer("Producer_" + i, queue);
executorService.submit(() -> {
while (true) {
producer.produce();
}
});
}
for (var i = 0; i < 3; i++) {
final var consumer = new Consumer("Consumer_" + i, queue);
executorService.submit(() -> {
while (true) {
consumer.consume();
}
});
}
executorService.shutdown();
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
executorService.shutdownNow();
} catch (InterruptedException e) {
LOGGER.error("Error waiting for ExecutorService shutdown");
}
}
程序输出
08:10:08.008 [pool-1-thread-3] INFO com.iluwatar.producer.consumer.Consumer -- Consumer [Consumer_0] consume item [0] produced by [Producer_1]
08:10:08.008 [pool-1-thread-4] INFO com.iluwatar.producer.consumer.Consumer -- Consumer [Consumer_1] consume item [0] produced by [Producer_0]
08:10:08.517 [pool-1-thread-5] INFO com.iluwatar.producer.consumer.Consumer -- Consumer [Consumer_2] consume item [1] produced by [Producer_0]
08:10:08.952 [pool-1-thread-3] INFO com.iluwatar.producer.consumer.Consumer -- Consumer [Consumer_0] consume item [1] produced by [Producer_1]
08:10:09.208 [pool-1-thread-4] INFO com.iluwatar.producer.consumer.Consumer -- Consumer [Consumer_1] consume item [2] produced by [Producer_0]
08:10:09.354 [pool-1-thread-5] INFO com.iluwatar.producer.consumer.Consumer -- Consumer [Consumer_2] consume item [2] produced by [Producer_1]
08:10:10.214 [pool-1-thread-3] INFO com.iluwatar.producer.consumer.Consumer -- Consumer [Consumer_0] consume item [3] produced by [Producer_1]
08:10:10.585 [pool-1-thread-4] INFO com.iluwatar.producer.consumer.Consumer -- Consumer [Consumer_1] consume item [3] produced by [Producer_0]
08:10:11.530 [pool-1-thread-5] INFO com.iluwatar.producer.consumer.Consumer -- Consumer [Consumer_2] consume item [4] produced by [Producer_1]
08:10:11.682 [pool-1-thread-3] INFO com.iluwatar.producer.consumer.Consumer -- Consumer [Consumer_0] consume item [4] produced by [Producer_0]
08:10:11.781 [pool-1-thread-4] INFO com.iluwatar.producer.consumer.Consumer -- Consumer [Consumer_1] consume item [5] produced by [Producer_0]
08:10:12.209 [pool-1-thread-5] INFO com.iluwatar.producer.consumer.Consumer -- Consumer [Consumer_2] consume item [5] produced by [Producer_1]
08:10:13.045 [pool-1-thread-3] INFO com.iluwatar.producer.consumer.Consumer -- Consumer [Consumer_0] consume item [6] produced by [Producer_0]
08:10:13.861 [pool-1-thread-4] INFO com.iluwatar.producer.consumer.Consumer -- Consumer [Consumer_1] consume item [6] produced by [Producer_1]
08:10:14.739 [pool-1-thread-5] INFO com.iluwatar.producer.consumer.Consumer -- Consumer [Consumer_2] consume item [7] produced by [Producer_1]
08:10:14.842 [pool-1-thread-3] INFO com.iluwatar.producer.consumer.Consumer -- Consumer [Consumer_0] consume item [7] produced by [Producer_0]
08:10:15.975 [pool-1-thread-4] INFO com.iluwatar.producer.consumer.Consumer -- Consumer [Consumer_1] consume item [8] produced by [Producer_0]
08:10:16.378 [pool-1-thread-5] INFO com.iluwatar.producer.consumer.Consumer -- Consumer [Consumer_2] consume item [8] produced by [Producer_1]
08:10:16.967 [pool-1-thread-3] INFO com.iluwatar.producer.consumer.Consumer -- Consumer [Consumer_0] consume item [9] produced by [Producer_0]
08:10:17.417 [pool-1-thread-4] INFO com.iluwatar.producer.consumer.Consumer -- Consumer [Consumer_1] consume item [9] produced by [Producer_1]
08:10:17.483 [pool-1-thread-5] INFO com.iluwatar.producer.consumer.Consumer -- Consumer [Consumer_2] consume item [10] produced by [Producer_1]
生产者-消费者模式的详细解释以及现实世界中的例子

何时在 Java 中使用生产者-消费者模式
- 当您需要管理一个缓冲区或队列,生产者向其中添加数据,而消费者从其中获取数据时,这通常在多线程环境中。
- 当解耦数据的生产和消费对于应用程序的设计、性能或可维护性有利时。
- 适用于需要对共享资源或数据结构进行同步访问的场景。
Java 中生产者-消费者模式的现实世界应用
- 线程池,其中工作线程充当消费者,处理由另一个线程产生的任务。
- 日志框架,其中日志消息由应用程序的各个部分产生,并由日志服务消费。
- 分布式系统中的消息队列,用于服务之间的异步通信。
生产者-消费者模式的优缺点
优点
- 解耦:生产者和消费者可以独立运行,简化系统设计。
- 性能提升:允许多个生产者和消费者线程并发工作,提高吞吐量。
- 灵活性:易于扩展以添加更多生产者或消费者,而无需对现有系统进行重大更改。
权衡
- 复杂性:需要仔细处理同步和潜在的死锁。
- 资源管理:正确管理缓冲区大小以避免溢出或下溢情况。