spirng 5.1.7
rabbitmq 2.1.8
java 1.8
我在测试消息可靠投递的confirm时,是用topic模式来测试的。 当我向交换机发送消息时,我会回调 confirm(CorrelationData correlationData, boolean ack, String cause)。 即使ack发送成功,ack始终为false,并且队列里有这条消息,消费者也可以消费这条消息。
code
producer
<rabbitmq:queue id="spring_topic_queue01" name="spring_topic_queue01" auto-declare="true"/> <rabbitmq:queue id="spring_topic_queue02" name="spring_topic_queue02" auto-declare="true"/> <rabbitmq:topic-exchange name="spring_topic_exchange"> <rabbitmq:bindings> <rabbitmq:binding pattern="*.java.#" queue="spring_topic_queue01"/> <rabbitmq:binding pattern="*.*.spring" queue="spring_topic_queue02"/> <rabbitmq:binding pattern="sql.#" queue="spring_topic_queue02"/> </rabbitmq:bindings> </rabbitmq:topic-exchange>
consumer
<bean id="topicQueueListener01" class="com.zyl.TopicQueueListener01"/> <bean id="topicQueueListener02" class="com.zyl.TopicQueueListener02"/> <rabbitmq:listener-container connection-factory="connectionFactory"> <rabbitmq:listener ref="topicQueueListener01" queue-names="spring_topic_queue01"/> <rabbitmq:listener ref="topicQueueListener02" queue-names="spring_topic_queue02"/> </rabbitmq:listener-container>
linstener
public class TopicQueueListener01 implements MessageListener { @Override public void onMessage(Message message) { System.out.println("Consumer1:"+new String(message.getBody())); } } public class TopicQueueListener02 implements MessageListener { @Override public void onMessage(Message message) { System.out.println("Consumer2:"+new String(message.getBody())); } }
test
@Test public void topicTest() throws InterruptedException { rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) System.out.println("ok!\n"+cause); else System.out.println("fail!\n"+cause); }); rabbitTemplate.convertAndSend("spring_topic_exchange", "sql.ssm.out", "Hello RabbitMQ!"); Thread.sleep(10); }
在测试代码中最后一行,如果加了Thread.sleep(10);
则正确输出,ack为true,输出结果为:ok! null
,如果没写这行则输出fail! clean channel shutdown; protocol method: #method\<channel.close\>(reply-code=200, reply-text=OK,class-id=0, method-id=0)
.
链接工厂的publisher-confirms="true"
属性我也加了。
正常情况下应该是不用写sleep()吧?
channel关闭异常,这是使用@Test做单元测试才会出现的情况,还未等待channel正常关闭,程序就开始关闭(消息可能还未发送完成),
rabbitTemplate.convertAndSend("spring_topic_exchange", "sql.ssm.out", "Hello RabbitMQ!");
你加的sleep就是起到了等待消息发送、分发、监听完成的作用,在正常使用中不会有这样的问题。
import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; // ... @Test public void topicTest() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) System.out.println("ok!\n" + cause); else System.out.println("fail!\n" + cause); latch.countDown(); }); rabbitTemplate.convertAndSend("spring_topic_exchange", "sql.ssm.out", "Hello RabbitMQ!"); latch.await(5, TimeUnit.SECONDS); // 等待最多 5 秒,直到 confirmCallback 被触发 }