博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ系列(五)--高级特性
阅读量:5018 次
发布时间:2019-06-12

本文共 4061 字,大约阅读时间需要 13 分钟。

在上一篇文章讲解MQ消息可靠性投递和幂等性中有提到confirm机制的重要性,现在更相信的说明一下

一、Confirm机制

  Confirm就是消息确认,当Producer发送消息,如果Broker收到消息,会回复一个应答,我们可以以此来确认消息是否成功送达,是保证

消息可靠性投递的核心保障

Producer代码如下,只需要修改Producer端,而Consumer端不需要修改

//4 指定我们的消息投递模式: 消息的确认模式channel.confirmSelect();//5 发送一条消息String msg = "Hello RabbitMQ Send confirm message!";channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());//6 添加一个确认监听channel.addConfirmListener(new ConfirmListener() {    @Override    public void handleNack(long deliveryTag, boolean multiple) throws IOException {        System.err.println("-------no ack!-----------");    }    @Override    public void handleAck(long deliveryTag, boolean multiple) throws IOException {        System.err.println("-------ack!-----------");    }});

结果:

-------ack!-----------

只要Producer能把消息发送给Broker,就会返回handlerAck中,返回到NAck的可能很小,例如MQ出现异常,queue的容量达到上限

二、Return消息机制

Return Listener用于处理一些不可路由的消息

Producer:

public class Producer {    public static void main(String[] args) throws Exception {        //1 创建ConnectionFactory        ConnectionFactory factory = new ConnectionFactory();        factory.setVirtualHost("/");        factory.setHost("139.196.75.238");        factory.setPort(5672);        //2 获取Connection        Connection connection = factory.newConnection();        //3 通过Connection创建一个新的Channel        Channel channel = connection.createChannel();        String exchangeName = "exchange_topic";        String routingKey = "fdasfdsafsadf4543453";        //6 添加一个return监听        channel.addReturnListener(new ReturnListener() {            @Override            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {                System.err.println("---------handle  return----------");                System.err.println("replyCode: " + replyCode);                System.err.println("replyText: " + replyText);                System.err.println("exchange: " + exchange);                System.err.println("routingKey: " + routingKey);                System.err.println("properties: " + properties);                System.err.println("body: " + new String(body));            }        });        //5 发送一条消息        String msg = "Hello RabbitMQ Send confirm message!";        channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());    }}
Producer Return

结果:

---------handle  return----------replyCode: 312replyText: NO_ROUTEexchange: exchange_topicroutingKey: fdasfdsafsadf4543453properties: #contentHeader
(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)body: Hello RabbitMQ Send confirm message!

注意:

  channel.basicPublish参数里面一定要把Mandatory设置为true,才能收到监听不可达的消息(创建exchange、routingKey不匹配等问题

,导致不可达),然后进行后续处理,如果为false,broker自动删除该消息,上面例子就是routingKey设置不匹配,Consumer的代码就不给了

三、消息端限流

限流一般无法从生产端,只能在消费端处理

在Consumer端设置:

channel.basicQos(0, 1, false);channel.basicConsume(queueName, false, new MyConsumer(channel));

qos:

  服务质量保证,在非自动确认情况下,一定数目的消息没有确认,不进行消费新的消息,通过producer/consumer设置qos的值

channel.basicQos(prefetchSize, prefetch_count, global);

注意:

  prefetchSize和global,rabbitMQ没有实现,默认0表示对单条message的大小没有限制、false(非channel级别,consumer级别)

  channel.basicConsume中自动签收一定要设置成false

  prefetch_count表示一次给几条进行消费,直到返回ack,才能继续给prefetch_count条message

在MyConsumer中手动签收

public class MyConsumer extends DefaultConsumer {    private Channel channel;    public MyConsumer(Channel channel) {        super(channel);        this.channel = channel;    }    @Override    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {        System.err.println("-----------consume message----------");        System.err.println("body: " + new String(body));        channel.basicAck(envelope.getDeliveryTag(), false);    }}

四、TTL

五、死信队列

未完待续。。。

 

转载于:https://www.cnblogs.com/huigelaile/p/10917709.html

你可能感兴趣的文章
面试题61 把二叉树打印成多行
查看>>
第二章 Vue快速入门--20 品牌案例-完成品牌列表的添加功能+ 21 品牌案例-根据Id完成品牌的删除...
查看>>
Codeforces Round #327 (Div. 2)
查看>>
ODAC(V9.5.15) 学习笔记(三)TOraSession(2)
查看>>
SQL中的replace函数
查看>>
java中的类型安全问题-Type safety: Unchecked cast from Object to ...
查看>>
如何解决最后一个尾注引用显示与致谢混为一谈的问题-下
查看>>
css文本样式text、字体样式font
查看>>
python判断图片是否损坏
查看>>
MySQL服务启动:某些服务在未由其他服务或程序使用时将自动停止
查看>>
KNN与SVM对比&SVM与逻辑回归的对比
查看>>
团队个人冲刺第三天
查看>>
2017-10-17 NOIP模拟赛2
查看>>
How to install ia32-libs in Ubuntu 14.04 LTS (Trusty Tahr)
查看>>
The Ctrl & CapsLock `problem'
查看>>
linux故障判断
查看>>
Java进阶知识点6:并发容器背后的设计理念 - 锁分段、写时复制和弱一致性
查看>>
Makefile ===> Makefile 快速学习
查看>>
face detection[HR]
查看>>
java性能调优工具
查看>>