黄色网址大全免费-黄色网址你懂得-黄色网址你懂的-黄色网址有那些-免费超爽视频-免费大片黄国产在线观看

專(zhuān)注Java教育14年 全國(guó)咨詢(xún)/投訴熱線(xiàn):400-8080-105
動(dòng)力節(jié)點(diǎn)LOGO圖
始于2009,口口相傳的Java黃埔軍校
首頁(yè) hot資訊 使用RabbitMQ配置死信隊(duì)列

使用RabbitMQ配置死信隊(duì)列

更新時(shí)間:2022-05-10 10:18:20 來(lái)源:動(dòng)力節(jié)點(diǎn) 瀏覽3607次

當(dāng)發(fā)生以下情況之一時(shí),來(lái)自消息隊(duì)列的可能是“死信”:

消息被拒絕并且重新排隊(duì)設(shè)置為 false

消息的 TTL 過(guò)期

超出隊(duì)列長(zhǎng)度限制

為了通過(guò)示例進(jìn)行演示,我選擇了第一種情況,即消息被拒絕。生產(chǎn)者將PaymentOrders作為消息發(fā)送,這些消息將由消費(fèi)者處理。當(dāng)PaymentOrder付款人賬戶(hù)資金不足時(shí),消息將被拒絕。

生產(chǎn)者

生產(chǎn)者是一個(gè) Spring Boot 應(yīng)用程序,它使用Spring AMQP庫(kù)向PaymentOrderRabbitMQ 發(fā)送消息。

生產(chǎn)者的 API

生產(chǎn)者 API 的第一部分是定義交換器的名稱(chēng)、路由密鑰、傳入和死信隊(duì)列。

public class Constants {
    public static final String EXCHANGE_NAME = "payment-orders.exchange";
    public static final String ROUTING_KEY_NAME = "payment-orders";
    public static final String INCOMING_QUEUE_NAME = "payment-orders.incoming.queue";
    public static final String DEAD_LETTER_QUEUE_NAME = "payment-orders.dead-letter.queue";
}

第二部分是定義消息格式。我們?cè)诖耸纠惺褂?JSON。以下 JSON 文檔顯示了我們?nèi)绾谓aymentOrder

{
  "from":"SA54 22PS JCLV 7LWT 7LHY EBLO",
  "to":"IT23 K545 5414 339G WLPI 2YF6 VBP",
  "amount":54.75
}

請(qǐng)注意,最好不要使用自定義序列化格式(如有效負(fù)載的 Java 序列化),因?yàn)檫@意味著您需要有一個(gè)基于 Java 的使用者。好的做法是將有效負(fù)載格式化為 JSON。每個(gè)平臺(tái)和/或語(yǔ)言都可以解析 JSON。

生產(chǎn)者配置

我們需要配置 AMQP 基礎(chǔ)設(shè)施。死信隊(duì)列配置封裝在傳入隊(duì)列聲明中。

有一個(gè)死信交換direct(DLX) 的概念,它是類(lèi)型topic或的正常交換fanout。如果在處理從隊(duì)列中獲取的消息期間發(fā)生故障,RabbitMQ 會(huì)檢查是否為該隊(duì)列配置了死信交換。如果通過(guò)x-dead-letter-exchange參數(shù)配置了一個(gè),那么它將使用原始路由密鑰將失敗的消息路由到它??梢酝ㄟ^(guò)x-dead-letter-routing-key參數(shù)覆蓋此路由鍵。

在此示例中,我們使用default exchange(no-name) 作為 the dead letter exchange,并使用死信隊(duì)列名稱(chēng)作為新的路由鍵。這將起作用,因?yàn)槿魏侮?duì)列都綁定到默認(rèn)交換,綁定鍵等于隊(duì)列名稱(chēng)。

@Configuration
public class AmqpConfig {
    @Bean
    DirectExchange exchange() {
        return new DirectExchange(Constants.EXCHANGE_NAME);
    }
    @Bean
    Queue incomingQueue() {
        return QueueBuilder.durable(Constants.INCOMING_QUEUE_NAME)
                .withArgument("x-dead-letter-exchange", "")
                .withArgument("x-dead-letter-routing-key", Constants.DEAD_LETTER_QUEUE_NAME)
                .build();
    }
    @Bean
    Binding binding() {
        return BindingBuilder.bind(incomingQueue()).to(exchange()).with(Constants.ROUTING_KEY_NAME);
    }
    @Bean
    Queue deadLetterQueue() {
        return QueueBuilder.durable(Constants.DEAD_LETTER_QUEUE_NAME).build();
    }
    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

用于隊(duì)列和交換的構(gòu)建器 API 非常方便,并且從 Spring AMQP 庫(kù)的 1.6 版本開(kāi)始可用。

在 RabbitMQ 管理控制臺(tái)中,DLX和DLK標(biāo)簽指示在傳入隊(duì)列上設(shè)置了dead letter exchange和dead letter routing key參數(shù)。

生產(chǎn)者邏輯

生產(chǎn)者每 5 秒生成一次隨機(jī)PaymentOrder消息,這些消息被發(fā)送到 RabbitMQ 進(jìn)行進(jìn)一步處理。SpringAmqpTemplate是自動(dòng)配置的,它可以連接到我們的組件中。由于消息格式是 JSON ,Jackson2JsonMessageConverter因此定義了它將自動(dòng)關(guān)聯(lián)到 auto-configured AmqpTemplate。

@Component
public class Producer {
    private AmqpTemplate amqpTemplate;
    public Producer(AmqpTemplate amqpTemplate) {
        this.amqpTemplate = amqpTemplate;
    }
    @Scheduled(fixedDelay = 1000L)
    public void send() {
        PaymentOrder paymentOrder = new PaymentOrder(
                Iban.random().toFormattedString(),
                Iban.random().toFormattedString(),
                new BigDecimal(1D + new Random().nextDouble() * 100D).setScale(2, BigDecimal.ROUND_FLOOR));
        amqpTemplate.convertAndSend(Constants.EXCHANGE_NAME, Constants.ROUTING_KEY_NAME, paymentOrder);
    }
}

消費(fèi)者

對(duì)于這個(gè)簡(jiǎn)單的示例,消費(fèi)者也是一個(gè) Spring Boot 應(yīng)用程序,但在實(shí)際應(yīng)用程序中,消費(fèi)者和生產(chǎn)者不必在同一平臺(tái)/語(yǔ)言上。

消費(fèi)者 API

消費(fèi)者 API 的第一部分是指定它連接到哪個(gè)隊(duì)列。

public class Constants {
    public static final String DEAD_LETTER_QUEUE_NAME = "payment-orders.dead-letter.queue";
    public static final String INCOMING_QUEUE_NAME = "payment-orders.incoming.queue";
}

第二部分是適應(yīng)生產(chǎn)者定義的消息格式。請(qǐng)注意,在這種情況下,兩個(gè)應(yīng)用程序都是基于 Java 的,因此我可以創(chuàng)建一個(gè)包含PaymentOrder類(lèi)文件的 jar 文件并與消費(fèi)者和生產(chǎn)者共享它。然而,這是不好的做法,因?yàn)樗肓嘶诠蚕韼?kù)的緊密耦合。更好的方法是使用一些代碼重復(fù)(PaymentOrder在這種情況下為類(lèi))并通過(guò)同意消息格式來(lái)使用更松散的耦合方法。

public class PaymentOrder {
    String from;
    String to;
    BigDecimal amount;
    @JsonCreator
    public PaymentOrder(@JsonProperty("from") String from,
                        @JsonProperty("to") String to,
                        @JsonProperty("amount") BigDecimal amount) {
        this.from = from;
        this.to = to;
        this.amount = amount;
    }
    // getters and toString()
}

消費(fèi)者配置

消費(fèi)者只關(guān)心從中獲取消息的隊(duì)列。傳入隊(duì)列必須存在,否則消費(fèi)者將無(wú)法啟動(dòng)。請(qǐng)注意,dead letter queue消費(fèi)者啟動(dòng)時(shí)不必存在 ,但在消息需要“死信”時(shí)它應(yīng)該存在。如果它丟失,則消息將被靜默丟棄。

@Configuration
public class AmqpConfig {
    @Bean
    Queue incomingQueue() {
        return QueueBuilder.durable(Constants.INCOMING_QUEUE_NAME)
                .withArgument("x-dead-letter-exchange", "")
                .withArgument("x-dead-letter-routing-key", Constants.DEAD_LETTER_QUEUE_NAME)
                .build();
    }
    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

默認(rèn)情況下啟用重新排隊(duì)。為了“死信”消息,您需要將以下屬性設(shè)置為 false。

spring:
  rabbitmq:
    listener:
      default-requeue-rejected: false

但是,如果您想在某些錯(cuò)誤情況下啟用重新排隊(duì),最好保持啟用重新排隊(duì)并利用AmqpRejectAndDontRequeueException將發(fā)送basic.reject帶有 requeue=false 的選項(xiàng)。

消費(fèi)邏輯

每當(dāng)傳入隊(duì)列上有消息可用時(shí),將使用反序列化的實(shí)例process調(diào)用該方法。在這里,我們通過(guò)拋出一個(gè)擴(kuò)展異常PaymentOrder來(lái)模擬消息拒絕。InsufficientFundsExceptionAmqpRejectAndDontRequeueException

@Component
public class Consumer {
    @RabbitListener(queues = Constants.INCOMING_QUEUE_NAME)
    public void process(@Payload PaymentOrder paymentOrder) throws InsufficientFundsException {
        if (new Random().nextBoolean()) {
            throw new InsufficientFundsException("insufficient funds on account " + paymentOrder.getFrom());
        }
    }
}

下圖顯示了一條消息的示例,該P(yáng)aymentOrder消息被拒絕并最終進(jìn)入dead letter queue

有時(shí)它有助于自動(dòng)重試失敗的操作,以防它可能在后續(xù)嘗試中成功。RetryTemplateSpring AMQP 庫(kù)在Spring Retry項(xiàng)目(從 Spring Batch 中提取)的幫助下提供了對(duì)此的支持。Spring Boot 使配置變得非常容易,RetryTemplate如下面的示例所示。

spring:
  rabbitmq:
    listener:
      retry:
        enabled: true
        initial-interval: 2000
        max-attempts: 2
        multiplier: 1.5
        max-interval: 5000

使用上述配置,重試功能已啟用(默認(rèn)情況下禁用),最多應(yīng)有 2 次嘗試傳遞消息,第一次和第二次嘗試之間應(yīng)為 2 秒,稍后與上一次重試間隔乘以 1.5 和最多 5 秒。運(yùn)行您將在日志中看到的消費(fèi)者

2016-09-07 21:56:53.396  INFO 11995 --- [cTaskExecutor-1] com.example.consumer.Consumer            : Processing at 'Wed Sep 07 21:56:53 CEST 2016' payload 'PaymentOrder{from='RS32 5346 0536 6006 4886 88', to='FI61 8364 3364 9834 16', amount=45.57}'
2016-09-07 21:56:55.399  INFO 11995 --- [cTaskExecutor-1] com.example.consumer.Consumer            : Processing at 'Wed Sep 07 21:56:55 CEST 2016' payload 'PaymentOrder{from='RS32 5346 0536 6006 4886 88', to='FI61 8364 3364 9834 16', amount=45.57}'
2016-09-07 21:56:55.401  WARN 11995 --- [cTaskExecutor-1] o.s.a.r.r.RejectAndDontRequeueRecoverer  : Retries exhausted for message (Body:'{"from":"RS32 5346 0536 6006 4886 88","to":"FI61 8364 3364 9834 16","amount":45.57}' MessageProperties [headers={__TypeId__=com.example.producer.api.PaymentOrder}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=payment-orders.exchange, receivedRoutingKey=payment-orders, receivedDelay=null, deliveryTag=31, messageCount=0, consumerTag=amq.ctag-vd18OXS9PSOeJmBQLY4o-w, consumerQueue=payment-orders.incoming.queue])

結(jié)論

如您所見(jiàn),使用 RabbitMQ 配置死信隊(duì)列非常簡(jiǎn)單。如果大家想了解更多相關(guān)知識(shí),不妨來(lái)關(guān)注一下動(dòng)力節(jié)點(diǎn)的RabbitMQ教程,里面有更豐富的知識(shí)等著大家去學(xué)習(xí),希望對(duì)大家能夠有所幫助。

提交申請(qǐng)后,顧問(wèn)老師會(huì)電話(huà)與您溝通安排學(xué)習(xí)

免費(fèi)課程推薦 >>
技術(shù)文檔推薦 >>
主站蜘蛛池模板: 亚洲一区精品视频在线 | 成年人视频在线免费 | 翁熄性荡事最新篇王伟忠 | 天天爽夜夜爽人人爽免费 | 毛片观看网址 | 日日操日日摸 | 一级a性色生活片久久毛片 一级a美女毛片 | 一级毛片在线完整免费观看 | 成人在线视频免费看 | 色噜噜狠狠色综合日日 | 香蕉视频网页版 | 国内精品免费麻豆网站91麻豆 | 手机在线视频一区 | 亚洲日韩欧美一区二区在线 | 午夜在线观看完整高清免费 | 91精品国产高清久久久久久91 | 男女啪啪免费体验区 | 欧美xxxx色视频在线观看免费 | 成人精品免费视频 | 黑丝美足| 免费高清在线影片一区 | 717影院理伦午夜论八戒 | 欧美一级在线全免费 | 99视频在线免费 | 国产成在线观看免费视频 | 男女很黄很色床视频网站免 | 免费看三级黄色片 | 色噜噜狠狠狠综合曰曰曰88av | 五月天堂婷婷 | 亚洲丰满熟妇毛片在线播放 | www.日韩视频| 日韩一区二区三区在线观看 | 人人狠| 一级香蕉视频 | 日韩视频在线观看视频 | 一本大道无香蕉综合在线 | 国产自产视频 | 天天躁夜夜躁狠狠躁躁 | 在线午夜视频 | 国产成人精品一区二三区在线观看 | 成人动漫视频观看免费 |