0

    RabbitMQ防止消息丢失

    2023.04.25 | admin | 274次围观

    生产者没有成功把消息发送到MQ

    丢失的原因 :因为网络传输的不稳定性,当生产者在向MQ发送消息的过程中,MQ没有成功接收到消息接收服务器消息回调url失败,但是生产者却以为MQ成功接收到了消息,不会再次重复发送该消息,从而导致消息的丢失。

    解决办法 : 有两个解决办法:事务机制和confirm机制,最常用的是confirm机制(发布确认机制)。

    注意:

    RabbitMQ的事务机制是同步的,很耗型能,会降低RabbitMQ的吞吐量。

    confirm机制是异步的,生成者发送完一个消息之后,不需要等待RabbitMQ的回调,就可以发送下一个消息,当RabbitMQ成功接收到消息之后会自动异步的回调生产者的一个接口返回成功与否的消息。

    两个机制说明如下:

    confirm(发布确认)机制

    解释:RabbitMQ可以开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,生产者每次写的消息都会分配一个唯一的 id,如果消息成功写入 RabbitMQ 中,RabbitMQ 会给生产者回传一个 ack 消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,生产者可以重新发送。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调接收服务器消息回调url失败,那么可以重发。

    代码

    yml配置

    RabbitMQ防止消息丢失

    ----------------------------------------------------------------------------------------------------

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.stereotype.Component;
    /**
    * 交换机回滚
    */
    @Component
    @Slf4j
    public class ExchangeCallback implements RabbitTemplate.ConfirmCallback{
        /* correlationData 内含消息内容
         * ack 交换机接受成功或者失败。 true表示交换机接受消息成功, false表示交换机接受失败
         * cause 表示失败原因
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println("hello world");
            String id = correlationData.getId();
            String message = new String(correlationData.getReturnedMessage().getBody());
            if (ack){
                log.info("交换机收到消息id为{}, 消息内容为{}", id, message);
            }else {
                log.info("交换机未收到消息id为{}, 消息内容为{}, 原因为{}", id, message, cause);
            }
        }
    }

    ----------------------------------------队列防止消息丢失----------------------------------------------------------------

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.stereotype.Component;
    /**
     * 队列防止消息丢失
     */
    @Slf4j
    @Component
    public class QueueCallback implements RabbitTemplate.ReturnCallback{
        @Override
        public void returnedMessage(Message message,int replyCode, String replyText, String exchange, String routingKey) {
            log.info("消息 {} 经交换机 {} 通过routingKey={} 路由到队列失败,失败code为:{}, 失败原因为:{}",
                    new String(message.getBody()), exchange, routingKey, replyCode, replyText);
        }
    }

    --------------------------引用->controller-----------------------------------------------

    //交换机回滚
    @Autowired
    private ExchangeCallback exchangeCallback;

    //队列回滚
    @Autowired
    private QueueCallback queueCallback;

    /**
     * 初始化交换机监听
     */
    @PostConstruct
    public void init(){  

    //交换机
    rabbitTemplate.setConfirmCallback(exchangeCallback);
    /**
     * true:交换机无法将消息进行路由时,会将该消息返回给生产者
     * false:如果发现消息无法进行路由,则直接丢弃
     */
    rabbitTemplate.setMandatory(true);
    //队列
    rabbitTemplate.setReturnCallback(queueCallback);

    }

    /**
         * 发送消息
         * 结果:"这是一条消息"
         */
        @GetMapping("/sendMessageTest")
        public String sendMessageTest(){
            // 消息类型为object 发送对象也是可以的
            String msg = "这是一条消息";
            // 第一个参数为发送消息到那个交换机上,第二个是发送的路由键(交换机进行需要符合绑定的队列),第三个参数为发送的消息
    //CommonUtils.dirExchange--自己的交换机名称
    //CommonUtils.routingKey --路由Key值 
            rabbitTemplate.convertAndSend("1235",CommonUtils.routingKey,msg);
            System.out.println("消息发送成功:"+msg);
            return "发送成功;发送内容为:"+msg;
        }

    运行结果:

    版权声明

    本文仅代表作者观点。
    本文系作者授权发表,未经许可,不得转载。

    发表评论