Java 中的毒丸模式:优雅地终止多线程进程
大约 3 分钟
也称为
- 关闭信号
毒丸设计模式的意图
毒丸设计模式用于通过向消息队列发送特殊消息(“毒丸”)来优雅地关闭服务或生产者-消费者系统,该消息指示不再发送更多消息,允许消费者终止。
毒丸模式的详细解释及现实世界示例
现实世界示例
毒丸设计模式的现实世界类比是零售店中使用“已关闭”标志。 当商店准备关门时,经理会在门上放置一个“已关闭”标志。 这个标志向任何新顾客发出信号,表示不再接受顾客,但它不会立即将已经进入商店的顾客赶出去。 商店工作人员会继续为剩余的顾客服务,让他们完成购买,然后再最终锁门并关灯。 同样,在毒丸模式中,特殊的“毒丸”消息会向消费者发出信号,让他们停止接受新的任务,同时允许他们完成当前任务的处理,然后优雅地关闭。
通俗地说
毒丸是一种已知的结束消息交换的消息结构。
Java 中毒丸模式的编程示例
在此 Java 示例中,毒丸充当消息队列中的关闭信号,展示了有效的线程管理和消费者通信。
首先定义消息结构。 有接口Message
和实现SimpleMessage
。
public interface Message {
// Other properties and methods...
enum Headers {
DATE, SENDER
}
void addHeader(Headers header, String value);
String getHeader(Headers header);
Map<Headers, String> getHeaders();
void setBody(String body);
String getBody();
}
public class SimpleMessage implements Message {
private final Map<Headers, String> headers = new HashMap<>();
private String body;
@Override
public void addHeader(Headers header, String value) {
headers.put(header, value);
}
@Override
public String getHeader(Headers header) {
return headers.get(header);
}
@Override
public Map<Headers, String> getHeaders() {
return Collections.unmodifiableMap(headers);
}
@Override
public void setBody(String body) {
this.body = body;
}
@Override
public String getBody() {
return body;
}
}
为了传递消息,我们使用消息队列。 这里我们定义了与消息队列相关的类型:MqPublishPoint
、MqSubscribePoint
和MessageQueue
。 SimpleMessageQueue
实现了所有这些接口。
public interface MqPublishPoint {
void put(Message msg) throws InterruptedException;
}
public interface MqSubscribePoint {
Message take() throws InterruptedException;
}
public interface MessageQueue extends MqPublishPoint, MqSubscribePoint {}
public class SimpleMessageQueue implements MessageQueue {
private final BlockingQueue<Message> queue;
public SimpleMessageQueue(int bound) {
queue = new ArrayBlockingQueue<>(bound);
}
@Override
public void put(Message msg) throws InterruptedException {
queue.put(msg);
}
@Override
public Message take() throws InterruptedException {
return queue.take();
}
}
接下来,我们需要消息Producer
和Consumer
。 它们在内部使用上面的消息队列。 需要注意的是,当Producer
停止时,它会向Consumer
发送毒丸,以通知它消息传递已结束。
public class Producer {
// Other properties and methods...
public void send(String body) {
if (isStopped) {
throw new IllegalStateException(String.format(
"Producer %s was stopped and fail to deliver requested message [%s].", body, name));
}
var msg = new SimpleMessage();
msg.addHeader(Headers.DATE, new Date().toString());
msg.addHeader(Headers.SENDER, name);
msg.setBody(body);
try {
queue.put(msg);
} catch (InterruptedException e) {
// allow thread to exit
LOGGER.error("Exception caught.", e);
}
}
public void stop() {
isStopped = true;
try {
queue.put(Message.POISON_PILL);
} catch (InterruptedException e) {
// allow thread to exit
LOGGER.error("Exception caught.", e);
}
}
}
public class Consumer {
// Other properties and methods...
public void consume() {
while (true) {
try {
var msg = queue.take();
if (Message.POISON_PILL.equals(msg)) {
LOGGER.info("Consumer {} receive request to terminate.", name);
break;
}
var sender = msg.getHeader(Headers.SENDER);
var body = msg.getBody();
LOGGER.info("Message [{}] from [{}] received by [{}]", body, sender, name);
} catch (InterruptedException e) {
// allow thread to exit
LOGGER.error("Exception caught.", e);
return;
}
}
}
}
最后,我们准备展示整个示例的实际操作。
public static void main(String[] args) {
var queue = new SimpleMessageQueue(10000);
final var producer = new Producer("PRODUCER_1", queue);
final var consumer = new Consumer("CONSUMER_1", queue);
new Thread(consumer::consume).start();
new Thread(() -> {
producer.send("hand shake");
producer.send("some very important information");
producer.send("bye!");
producer.stop();
}).start();
}
程序输出
07:43:01.518 [Thread-0] INFO com.iluwatar.poison.pill.Consumer -- Message [hand shake] from [PRODUCER_1] received by [CONSUMER_1]
07:43:01.520 [Thread-0] INFO com.iluwatar.poison.pill.Consumer -- Message [some very important information] from [PRODUCER_1] received by [CONSUMER_1]
07:43:01.520 [Thread-0] INFO com.iluwatar.poison.pill.Consumer -- Message [bye!] from [PRODUCER_1] received by [CONSUMER_1]
07:43:01.520 [Thread-0] INFO com.iluwatar.poison.pill.Consumer -- Consumer CONSUMER_1 receive request to terminate.
毒丸模式的详细解释及现实世界示例

何时在 Java 中使用毒丸模式
在以下情况下使用毒丸模式
- 系统需要在多线程环境中具有强大的容错性和无缝的消费者关闭能力。
- 在生产者-消费者场景中,消费者需要了解消息处理的结束。
- 确保消费者可以在关闭之前完成剩余消息的处理。
Java 中毒丸模式的现实世界应用
- 使用特殊任务来表示关闭的 Java ExecutorService 关闭。
- 消息系统,其中特定消息表示队列处理的结束。
- Akka 框架
毒丸模式的优缺点
好处
- 简化了消费者的关闭过程。
- 确保所有挂起的任务在终止之前完成。
- 将关闭逻辑与主要处理逻辑解耦。
权衡
- 需要消费者检查毒丸,增加了开销。
- 如果管理不当,可能会导致消费者无法识别毒丸,从而导致无限期阻塞。