当前位置:首页 > > 架构师社区
[导读]我们知道,消息在RabbitMQ的整个生命周期是生产者投递消息到Exchange,Exchange根据路由键将消息路由到合适的Queue,Queue再将消息推(或消费者主动拉)给消费者。

我们知道,消息在RabbitMQ的整个生命周期是生产者投递消息ExchangeExchange根据路由键消息路由到合适的QueueQueue再将消息推(或消费者主动拉)给消费者

在这个过程当中,Exchange根据路由键将消息路由到合适的Queue的过程,可能发生诸如

  1. Exchange没有任何Queue与其绑定,
  2. 或者根据消息的路由键,没有任何一个合适的Queue来投递消息,

从而导致消息路由失败。对于这些路由失败的消息应该如何处理呢?有两种方式:

  1. 将消息返回给投递该条消息的生产者。
  2. 使用备份交换机 alternate-exchange(AE)。

方式1:将消息返回给投递该条消息的生产者

  • 配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=futao
spring.rabbitmq.password=123456789
spring.rabbitmq.virtual-host=/tech-sharing

# 当exchange无法找到任何一个合适的queue时,将消息return给生产者
spring.rabbitmq.template.mandatory=true
# 必须设置为true,否则消息消息路由失败也无法触发Return回调
spring.rabbitmq.publisher-returns=true
  • 交换机定义与消息发送
@Slf4j @Component public class NoMatchQueue { /**
     * 交换机名称
     */ public static final String EXCHANGE_NAME = "X_NO_MATCH_QUEUE"; @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void send() {
        log.info("发送消息");
        Order order = new Order(1, BigDecimal.TEN, OrderStatusEnum.UN_PAY.getStatus());
        Message message = MessageBuilder
                .withBody(JSON.toJSONString(order).getBytes(StandardCharsets.UTF_8))
                .setContentEncoding(StandardCharsets.UTF_8.displayName())
                .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .build();
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", message);
    }
} @Configuration class ExchangeDeclare { /**
     * 只定义一个交换机,但是不绑定任何Queue,所以发送到该Exchange的消息都会路由失败
     *
     * @return */ @Bean public Exchange noMatchQueueExchange() { return ExchangeBuilder
                .topicExchange(NoMatchQueue.EXCHANGE_NAME)
                .durable(true)
                .build();
    }
}
  • 设置回调函数
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returnedMessage) {
        log.error("消息被退回:{}", returnedMessage);
    }
});
  • 消息被退回:且可以看到原因是无法路由

RabbitMQ消息路由失败的处理方案(回调与备份交换机AE)


方式2:使用备份交换机

使用方式1需要我们在程序中进行编码设置回调函数监听,增加了生产者代码的复杂性,那么为了消息不丢失还有没有其他方式来处理路由失败的消息呢:答案是使用备份交换机

  • 相较于使用回调函数,使用备份交换机只需要给交换机绑定一个备份交换机即可,当消息路由失败之后,消息将投递到备份交换机,再由备份交换机路由消息到备份队列。这样我们只需要关注这个备份队列就能知道/获取到路由失败的消息。通常情况下备份交换的Type应该设置为 fanout。
  • 配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=futao
spring.rabbitmq.password=123456789
spring.rabbitmq.virtual-host=/tech-sharing

# 当exchange无法找到任何一个合适的queue时,将消息return给生产者
spring.rabbitmq.template.mandatory=false
# 必须设置为true,否则消息消息路由失败也无法触发Return回调
spring.rabbitmq.publisher-returns=false
  • 注意: 使用备份交换机模式,mandatory将无效,即就算mandatory设置为false,路由失败的消息同样会被投递到绑定的备份交换机。
  • 正常业务交换机(不绑定队列,使得消息一定会路由失败)
/**
 * 业务交换机
 *
 * @return */ @Bean public Exchange noMatchQueueExchange() { return ExchangeBuilder
            .topicExchange(NoMatchQueueAlternateExchange.EXCHANGE_NAME)
            .durable(true) // 绑定备份交换机 .alternate(X_ALTERNATE)
            .build();
}
  • 备份交换机/队列/绑定
/**
 * 备份队列
 *
 * @return */ @Bean public Queue alternateQueue() { return QueueBuilder
            .durable("Q_ALTERNATE")
            .build();
} /**
 * 备份交换机
 *
 * @return */ @Bean public Exchange alternateExchange() { return ExchangeBuilder
            .fanoutExchange(X_ALTERNATE)
            .durable(true)
            .build();
} /**
 * 备份绑定
 *
 * @param alternateExchange
 * @param alternateQueue
 * @return */ @Bean public Binding alternateBinding(Exchange alternateExchange, Queue alternateQueue) { return BindingBuilder
            .bind(alternateQueue)
            .to(alternateExchange)
            .with("")
            .noargs();
}
  • 消息投递
/**
 * 正常业务交换机
 */ public static final String EXCHANGE_NAME = "X_NO_MATCH_QUEUE_ALTERNATE"; @Autowired private RabbitTemplate rabbitTemplate; /**
 * 发送消息
 */ @PostConstruct public void send() {
    log.info("发送消息");
    Order order = new Order(1, BigDecimal.TEN, OrderStatusEnum.UN_PAY.getStatus());
    Message message = MessageBuilder
            .withBody(JSON.toJSONString(order).getBytes(StandardCharsets.UTF_8))
            .setContentEncoding(StandardCharsets.UTF_8.displayName())
            .setContentType(MessageProperties.CONTENT_TYPE_JSON)
            .build();
    rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", message);
}
  • 结果是消息被路由到备份交换机的备份队列

RabbitMQ消息路由失败的处理方案(回调与备份交换机AE)

  • 且: 如果你同时使用了两种方式,即(mandatory为true+Listener监听)和(备份交换机AlternateExchange),消息将只会路由到备份交换机,不会Return回生产者。


# 在原生RabbitMQ-client中演示这一过程:
@Slf4j public class AeTest { /**
     * 获取Channel
     */ private static final Channel CHANNEL = MqChannelUtils.getChannel(); /**
     * 备份交换机
     */ private static final String X_AE = "X_AE"; /**
     * 备份交换机绑定的队列
     */ private static final String Q_AE = "Q_AE"; /**
     * 正常业务的交换机
     */ private static final String X_1 = "X_1"; public static void main(String[] args) throws IOException { // 定义备份交换机-其实也是一个正常的交换机 CHANNEL.exchangeDeclare(X_AE, BuiltinExchangeType.FANOUT, true); // 定义备份队列 CHANNEL.queueDeclare(Q_AE, true, false, false, null); // 绑定备份 CHANNEL.queueBind(Q_AE, X_AE, "");

        HashMap arguments = new HashMap<>(); // 绑定的备份交换机 arguments.put("alternate-exchange", X_AE); // 定义交换机 CHANNEL.exchangeDeclare(X_1, BuiltinExchangeType.TOPIC, false, false, arguments); // 添加监听器,看看是否还会return消息 CHANNEL.addReturnListener(new ReturnCallback() { @Override public void handle(Return returnMessage) {
                log.error("消息被退回{}", returnMessage);
            }
        }); // 尝试向交换机发送消息(无法路由)- mandatory参数无效 CHANNEL.basicPublish(X_1, "", false, false, new AMQP.BasicProperties(), "阿依古丽".getBytes(StandardCharsets.UTF_8));
    }
}
  • 两个交换机,正常的交换机X_1和备份交换机X_AE

RabbitMQ消息路由失败的处理方案(回调与备份交换机AE)

RabbitMQ消息路由失败的处理方案(回调与备份交换机AE)

RabbitMQ消息路由失败的处理方案(回调与备份交换机AE)

  • 备份交换机绑定的队列已经接收到了路由失败的消息

RabbitMQ消息路由失败的处理方案(回调与备份交换机AE)

  • 其他要注意的点:

    • 备份交换机的Type设置为fanout比较合适,这样可以忽略RoutingKey,避免备份交换机又路由失败。
    • 被投递到备份交换机的RoutingKey为消息投递到MQ时的原始RoutingKey,不会变,这一点在其他场景下也是一样的。
    • 使用备份交换机模式,mandatory将无效,即就算mandatory设置为false,路由失败的消息同样会被投递到绑定的备份交换机。

# 源代码

https://gitee.com/FutaoSmile/tech-sharing-mq


免责声明:本文内容由21ic获得授权后发布,版权归原作者所有,本平台仅提供信息存储服务。文章仅代表作者个人观点,不代表本平台立场,如有问题,请联系我们,谢谢!

本站声明: 本文章由作者或相关机构授权发布,目的在于传递更多信息,并不代表本站赞同其观点,本站亦不保证或承诺内容真实性等。需要转载请联系该专栏作者,如若文章内容侵犯您的权益,请及时联系本站删除。
换一批
延伸阅读

LED驱动电源的输入包括高压工频交流(即市电)、低压直流、高压直流、低压高频交流(如电子变压器的输出)等。

关键字: 驱动电源

在工业自动化蓬勃发展的当下,工业电机作为核心动力设备,其驱动电源的性能直接关系到整个系统的稳定性和可靠性。其中,反电动势抑制与过流保护是驱动电源设计中至关重要的两个环节,集成化方案的设计成为提升电机驱动性能的关键。

关键字: 工业电机 驱动电源

LED 驱动电源作为 LED 照明系统的 “心脏”,其稳定性直接决定了整个照明设备的使用寿命。然而,在实际应用中,LED 驱动电源易损坏的问题却十分常见,不仅增加了维护成本,还影响了用户体验。要解决这一问题,需从设计、生...

关键字: 驱动电源 照明系统 散热

根据LED驱动电源的公式,电感内电流波动大小和电感值成反比,输出纹波和输出电容值成反比。所以加大电感值和输出电容值可以减小纹波。

关键字: LED 设计 驱动电源

电动汽车(EV)作为新能源汽车的重要代表,正逐渐成为全球汽车产业的重要发展方向。电动汽车的核心技术之一是电机驱动控制系统,而绝缘栅双极型晶体管(IGBT)作为电机驱动系统中的关键元件,其性能直接影响到电动汽车的动力性能和...

关键字: 电动汽车 新能源 驱动电源

在现代城市建设中,街道及停车场照明作为基础设施的重要组成部分,其质量和效率直接关系到城市的公共安全、居民生活质量和能源利用效率。随着科技的进步,高亮度白光发光二极管(LED)因其独特的优势逐渐取代传统光源,成为大功率区域...

关键字: 发光二极管 驱动电源 LED

LED通用照明设计工程师会遇到许多挑战,如功率密度、功率因数校正(PFC)、空间受限和可靠性等。

关键字: LED 驱动电源 功率因数校正

在LED照明技术日益普及的今天,LED驱动电源的电磁干扰(EMI)问题成为了一个不可忽视的挑战。电磁干扰不仅会影响LED灯具的正常工作,还可能对周围电子设备造成不利影响,甚至引发系统故障。因此,采取有效的硬件措施来解决L...

关键字: LED照明技术 电磁干扰 驱动电源

开关电源具有效率高的特性,而且开关电源的变压器体积比串联稳压型电源的要小得多,电源电路比较整洁,整机重量也有所下降,所以,现在的LED驱动电源

关键字: LED 驱动电源 开关电源

LED驱动电源是把电源供应转换为特定的电压电流以驱动LED发光的电压转换器,通常情况下:LED驱动电源的输入包括高压工频交流(即市电)、低压直流、高压直流、低压高频交流(如电子变压器的输出)等。

关键字: LED 隧道灯 驱动电源
关闭