#独家
spring集成rabbitmq的confirm方法中,ack一直为false?

2023-05-16 0 4,213

spirng 5.1.7
rabbitmq 2.1.8
java 1.8

我在测试消息可靠投递的confirm时,是用topic模式来测试的。 当我向交换机发送消息时,我会回调 confirm(CorrelationData correlationData, boolean ack, String cause)。 即使ack发送成功,ack始终为false,并且队列里有这条消息,消费者也可以消费这条消息。

code

producer

  <rabbitmq:queue id="spring_topic_queue01" name="spring_topic_queue01" auto-declare="true"/>
    <rabbitmq:queue id="spring_topic_queue02" name="spring_topic_queue02" auto-declare="true"/>
    <rabbitmq:topic-exchange name="spring_topic_exchange">
        <rabbitmq:bindings>
            <rabbitmq:binding pattern="*.java.#" queue="spring_topic_queue01"/>
            <rabbitmq:binding pattern="*.*.spring" queue="spring_topic_queue02"/>
            <rabbitmq:binding pattern="sql.#" queue="spring_topic_queue02"/>
        </rabbitmq:bindings>
    </rabbitmq:topic-exchange>

consumer

<bean id="topicQueueListener01" class="com.zyl.TopicQueueListener01"/>
    <bean id="topicQueueListener02" class="com.zyl.TopicQueueListener02"/>
    
    <rabbitmq:listener-container connection-factory="connectionFactory">
    
        <rabbitmq:listener ref="topicQueueListener01" queue-names="spring_topic_queue01"/>
        <rabbitmq:listener ref="topicQueueListener02" queue-names="spring_topic_queue02"/>

    </rabbitmq:listener-container>

linstener

public class TopicQueueListener01 implements MessageListener {
    @Override
    public void onMessage(Message message) {
        System.out.println("Consumer1:"+new String(message.getBody()));
    }
}

public class TopicQueueListener02 implements MessageListener {
    @Override
    public void onMessage(Message message) {
        System.out.println("Consumer2:"+new String(message.getBody()));
    }
}

test

@Test
    public void topicTest() throws InterruptedException {
  
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack)
                System.out.println("ok!\n"+cause);
            else
                System.out.println("fail!\n"+cause);
        });

        rabbitTemplate.convertAndSend("spring_topic_exchange",
                "sql.ssm.out", "Hello RabbitMQ!");

        Thread.sleep(10);
    }

在测试代码中最后一行,如果加了Thread.sleep(10);则正确输出,ack为true,输出结果为:ok! null,如果没写这行则输出fail! clean channel shutdown; protocol method: #method\<channel.close\>(reply-code=200, reply-text=OK,class-id=0, method-id=0).
链接工厂的publisher-confirms="true"属性我也加了。

正常情况下应该是不用写sleep()吧?

channel关闭异常,这是使用@Test做单元测试才会出现的情况,还未等待channel正常关闭,程序就开始关闭(消息可能还未发送完成),

rabbitTemplate.convertAndSend("spring_topic_exchange", "sql.ssm.out", "Hello RabbitMQ!");

你加的sleep就是起到了等待消息发送、分发、监听完成的作用,在正常使用中不会有这样的问题。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

// ...

@Test
public void topicTest() throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);

    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        if (ack)
            System.out.println("ok!\n" + cause);
        else
            System.out.println("fail!\n" + cause);
        
        latch.countDown();
    });

    rabbitTemplate.convertAndSend("spring_topic_exchange",
            "sql.ssm.out", "Hello RabbitMQ!");

    latch.await(5, TimeUnit.SECONDS); // 等待最多 5 秒,直到 confirmCallback 被触发
}
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

1. JK下载官网所有资源来源于开发团队,加入会员即可下载使用!如有问题请联系右下角在线客服!
2. JK下载官方保障所有软件都通过人工亲测,为每位会员用户提供安全可靠的应用软件、游戏资源下载及程序开发服务。
3. JK开发团队针对会员诉求,历经多年拥有现今开发成果, 每款应用程序上线前都经过人工测试无误后提供安装使用,只为会员提供安全原创的应用。
4. PC/移动端应用下载后如遇安装使用问题请联系右下角在线客服或提交工单,一对一指导解决疑难。

JK软件下载官网 技术分享 spring集成rabbitmq的confirm方法中,ack一直为false? https://www.jkxiazai.com/1732.html

JK软件应用商店是经过官方安全认证,保障正版软件平台

相关资源

官方客服团队

为您解决烦忧 - 24小时在线 专业服务