MENU

RabbitMQ 入门

July 22, 2020 • Read: 419 • Java

基erLang语言 基于高可用 电信行业使用

两面性 高可用 源码困难

不生产消息 消息的搬运工 需要持久化保证高可用

规范性协议

AMQP Advanced Message Queuing Protocol

提供统一消息服务的应用层标准高级消息队列协议

连接 --》 信道(连接池有点类似)

基础概念术语

有效载荷 传递的真实消息

标签 给载荷打标签

交换器 路由键 绑定键 最大长度255字符

确认机制 自动确认 自行(手动)确认

如果消息路由不到指定队列 则丢弃 满足不了规则的 没有默认放在某个队列中

channel 绑上交换器 交换器上定义路由规则 队列 绑定交换器

消费者 绑上队列 消息本上传路由键

rabbitmq-model

四种交换器模式

Direct 完全匹配 比如 routingKey = king 那么绑定路由键 =king

fanout 广播队列

topic 路由键中的 * 和# 用 . 将路由键分为几个表示 有点模糊查询的感觉

匹配一个 king. king.kafka.1 不能匹配到 # 匹配多个 king.# king.kafka.1 可以匹配到

​ king. . 可以匹配后面两个分割 king.kafka.1

header(direct几乎一样 不常使用)

原生客户端API依赖

<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.0.0</version>
</dependency>

消息发布时候的权衡

针对推模式 生产者生产消息到exchange上面之后 MQ需要给予怎么样的回应

rabbitmq-balanced

失败通知

配置参数 mandatory = true

channel.addReturnListener(new ReturnListener()

channel.basicPublish(EXCHANGE_NAME,routekey,true,null,message.getBytes());

失败通知 可以记录入库 定时任务重新做 或者直接重试

加入事务

性能下降2-10倍 产生了同步

针对channel增加事务

txSelect 开启事务 txCommit 提交 txRollback 回滚

发送方确认

一般确认 、批量确认、异步监听确认

//一般确认
channel.confirmSelect();//开启确认模式
channel.watiForConfirms();
//批量确认 如果其中条目数发送失败 触发失败通知 不会阻塞住
channel.confirmSelect();
channel.waitForConfirmsOrDie();
//异步监听确认
// 添加发送者确认监听器
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
   //TODO 成功
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
       System.out.println("send_ACK:"+deliveryTag+",multiple:"+multiple);
    }
   //TODO 失败 RabbitMQ内部错误了 才会触发NACK
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("Erro----send_NACK:"+deliveryTag+",multiple:"+multiple);
    }
});

备用交换器

绑定备用交换器 那么失败通知 暂时不触发 先去备用了

        Map<String,Object> argsMap = new HashMap<String,Object>();
        argsMap.put("alternate-exchange",BAK_EXCHANGE_NAME);
        //主交换器
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT,
                false,false,argsMap);
        //备用交换器
        channel.exchangeDeclare(BAK_EXCHANGE_NAME,BuiltinExchangeType.FANOUT,
                true,false,null);

rabbitmq-rebalanceAll

消息消费时候权衡

消息获得方式 GET拉取 需要不停while

​ Consume 推送 消息到达RMQ 直接推送给消费者

消息应答 自动确认 手动确认

get效率较低 避免拉取

GetResponse getResponse = channel.basicGet(queueName, false);

手动确认/自动确认

关闭自动确认如果没有手动确认的话 在RMQ看来 消息是没有被消费成功 在下一次启动新的符合条件的消费者时候 还是会收到这些消息

重复消费的根本原因 就是没有进行相关的确认

channel.basicAck(envelope.getDeliveryTag(),false);

Qos预拉取

与批量确认的区别 代码程度以及拉取数量级别不同 Qos优化版的批量

事务

消息消费拒绝方式

Reject / Nack 单条/批量

重新投递参数 requeue = true 如果是false 直接丢弃消息

channel.basicReject(envelope.getDeliveryTag(),true);

客户端配置 第二个参数就是配置是否重新投递

channel.basicNack(envelope.getDeliveryTag(), false, false);

中间的参数 配置是否批量

死信交换器DLX

死亡信息的交换器 通常绑定在消费端 绑个队列

1.消息过期

2.消息队列达到最大长度(头丢弃 最先进去的 FIFO 队列)

3.投递且返回拒绝 不再重新投递

备用&死信区别:

备用 信息并没有进入队列处理无法路由的信息

死信是明确被队列拒绝了

备用是和主交换器联系 死信是和声明的队列发生关系的

rabbitmq-dlx-info

临时队列

内存中

        String queueName = "setQueue";
        Map<String, Object> arguments = new HashMap<String, Object>();
        arguments.put("x-expires",10*1000);//10秒被删除
        //TODO 队列的各种参数
        /*加入队列的各种参数*/
        channel.queueDeclare(queueName,false,false, true,arguments);//durable exclusive autoDelete
  • 自动删除队列 autoDelete

​ 自动删除条件 : 1.消费者断开 2.到期时间

  • 单消费者队列 exclusive 有且只有一个消费者
  • 自动过期队列 x-expires设置 + autoDelete = false

时间是怎么算的? 未使用 --》 1.队列没有GET操作 2.队列没有consumer连接 (生产者扔投递消息) 是一个不及时的操作

持久化队列

内存--》数据存在磁盘中 durable 性能很低

队列参数

x-expires 队列指定过期时间删除

x-message-ttl 队列级别默认消息过期时间

x-max-priority 设置投递消息的优先级 默认FIFO 实现插队 很鸡肋 只能255个排序

rabbit-queueParams

信息参数

通过 AMQP.BasicProperties 进行相关设置

expiration 过期时刻 时间戳 队列和信息都有失效时间 取小的为准

reply-to 一问一答 生产者得到消费者的应答 借助属性 这种模式背离了消息队列的本质 只做了解

rabbit-messageParams.jpg

Spring集成

        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.0.0.RELEASE</version>
        </dependency>

生产者配置

  1. rabbitmq配置

    <bean id="rabbitConnectionFactory"
         class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
            <constructor-arg value="127.0.0.1"/>
            <property name="username" value="guest"/>
            <property name="password" value="guest"/>
            <property name="channelCacheSize" value="8"/>
            <property name="port" value="5672"></property>
    </bean>
  2. 配置 rabbit:admin rabbitConnectionFactory
  3. 配置 消息模板类 rabbitTemplate 主要也是用这个来发送消息
  4. 队列 以及交换器 配置

参考:rq-order 项目配置

消费者配置

  1. rabbitmq配置
  2. 配置 rabbit:admin
  3. 配置队列 和交换器
  4. 配置消费者的bean 也就是service 实现
public class H1_Service implements MessageListener{
    private Logger logger = LoggerFactory.getLogger(H1_Service.class);
    public void onMessage(Message message) {
        logger.info("Get message: "+new String( message.getBody()));
    }
}
  1. 配置监听器 也就是service 和队列进行绑定
  <rabbit:listener-container connection-factory="rabbitConnectionFactory">
        <!-- 配置文件,消费者监听对应的队列(fanout类型交换器绑定的队列)  -->
        <rabbit:listener ref="h1_Service" queues="h1_queue" method="onMessage"/>
        <!-- 直接方式,消费者监听对应的队列(topic绑类型交换器绑定的队列)  -->
        <rabbit:listener ref="allTopicService" queues="all_queue" method="onMessage"/>
    </rabbit:listener-container>

MessageListener 自动确认

ChannelAwareMessageListener 感知channel 完成手动确认

通常情况下需要做的是:

​ 生产:发送方确认ConfirmCallback 失败通知ReturnCallback 持久化(交换器 队列 消息三者)

​ 消费:手动应答ChannelAwareMessageListener

参考:rp-depot配置

如何保证消息的顺序? 生产 MQ 消费者

队列下多个消费者 拆分单队列 单消费者 一对一 保证有序 exclusive参数配置

延时任务 延时订单 TTL+DLX

1.比如实现30分钟后推送一条消息 先发送一条正常消息 设置TTL值 没有任何消费者 时间到了 触发 DLX 路由死信队列进行消费 完成30分钟后的任务

2.rabbitmq 有插件 没用过

秒杀

全部请求都接受到MQ 但是消费者拉取固定平稳速率 流量削峰

集群化

创建用户是没有权限的 需要添加权限set_permissions 和 角色 set_user_tag

rabbitmqctl add_user username password

rabbitmqctl list_users

配置权限 写权限 读权限

集群是基于erlang生态的 需要保证/var/lib/rabbitmq/.erlang.cookie 几个节点机器是一样的才可以工作

普通集群

元数据相互之间复制 交换器也复制 但是队列里面的信息不同步1.性能 2.存储空间

如果A 连接102节点 如果转去101连接 101会把请求转发给102处理

如果102挂了 需要程序员手动切换到101上面 很鸡肋 可以用HA来解决

普通集群需要至少有一个磁盘节点才可以工作 如果只有一个磁盘+一个内存基本不可以CURD队列 只能正常维持工作

node01 node02

02:

stop_app

reset

join_cluster rabbit@node01 (--ram) 默认disc磁盘节点 加上--ram 表示内存节点

start_app ---》 cluster_status

镜像化

镜像队列 + 普通集群才能达到真正的高可用

队列同步 镜像队列 消息备份 实现高可用

1.通过代码构建

x-ha-policy 声明队列增加参数 值为all 表示队列镜像到所有节点

exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定

arguments.put("x-ha-policy","nodes");
arguments.put("x-ha-nodes","[rabbit@node1,rabbit2@node2]");//表明只镜像了两个节点机

2.通过rabbitmq生成策略实现 或者 后台命令行配置

例如,对队列名称以“queue_”开头的所有队列进行镜像,并在集群的两个节点上完成进行,policy的设置命令为:
rabbitmqctl set_policy ha-queue-two '^queue_' '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

优点:高可用 完全镜像

缺点:同步信息带宽开销太大 没什么拓展性

https://zhuanlan.zhihu.com/p/92592459

HAproxy

集群化 + 负载均衡

主要模块配置为: 连接的就是5670节点

listen rabbitmq_cluster

     bind 0.0.0.0:5670
     mode tcp
     balance roundrobin
     server rabbit01 127.0.0.1:5672 check inter 5000 rise 2 fall 3
     server rabbit02 node2:5672 check inter 5000 rise 2 fall 3

https://www.cnblogs.com/sky-cheng/p/10749189.html