RabbitMQ知识点

1、消息应答的方法:

  1. Channel.basicAck(用于肯定确认),RabbitMQ已知道该消息并且成功处理,可以将其丢弃
  2. Channel.basicNack(用于否定确认)
  3. Channel.basicReject(用于否定确认),与Channel.basicNack相比少了一个参数,不处理该消息了,直接拒绝,可以将其丢弃了。

2、Multiple的解释

  1. true表示批量应答channel上未应答的消息,比如channel上有传送tag的消息5,6,7,8,,当前tag是8,那么此时5-8的这些还未应答的消息就会被确认收到消息应答
  2. false同上面相比只会应答tag=8的消息,5,6,7这三个消息依然不会被确认收到消息应答

手动应答的好处是可以批量应答并且减少网络拥堵 。

3、消息手动应答代码

//getEnvelope()属性,getDeliveryTag()标记
/*
 * 参数1:消息的标记  tag
 * 参数2:是否批量应答,false:不批量应答;true:批量
 * */
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);

// 采用手动应答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,(consumerTag) -> {
    System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
});

4、持久化

// 声明队列
// 持久化 需要让Queue持久化
boolean durable = true;
channel.queueDeclare(TASK_QUEUE_NAME,durable,false,false,null);

注意:

  1. 如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误。
  2. 将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。

5、不公平分发

//不公平分发channel
//basicQos(0) --> 轮询分发(公平分发),默认值为0
//channel.basicQos(1) --> 不公平分发,能者多劳
//channel.basicQos(>2) --> 预取值
int prefetchCount = 1;
channel.basicQos(prefetchCount);
//采用手动应答
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

6、发布确认

发布确认的策略:

开启发布确认的方法:发布确认默认是没有开启的,如果要开启,需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法。

//开启发布确认
channel.confirmSelect();

6.1单个确认发布

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long) 这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。

这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。

/**
 * 单个发送
 */
public static void publishMessageIndividually() throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    //队列声明
    String queueName = UUID.randomUUID().toString();
    channel.queueDeclare(queueName, true, false, false, null);
    //开启发布确认
    channel.confirmSelect();

    long begin = System.currentTimeMillis();

    for (int i = 0; i < MESSAGE_COUNT; i++) {
        String message = i + "";
        channel.basicPublish("", queueName, null, message.getBytes());
        //服务端返回 false 或超时时间内未返回,生产者可以消息重发
        //单个消息马上进行发布确认
        boolean flag = channel.waitForConfirms();
        if (flag) {
            System.out.println("消息发送成功");
        }
    }

    long end = System.currentTimeMillis();
    System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");

}

6.2批量确认发布

上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量。

缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

/**
 * 批量确认发送
 */
public static void publishMessageBatch() throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    //队列声明
    String queueName = UUID.randomUUID().toString();
    channel.queueDeclare(queueName, true, false, false, null);
    //开启发布确认
    channel.confirmSelect();
    //批量确认消息大小
    int batchSize = 100;
    //未确认消息个数
    int outstandingMessageCount = 0;
    long begin = System.currentTimeMillis();

    for (int i = 0; i < MESSAGE_COUNT; i++) {
        String message = i + "";
        channel.basicPublish("", queueName, null, message.getBytes());
        outstandingMessageCount++;
        if (outstandingMessageCount == batchSize) {
            channel.waitForConfirms();
            outstandingMessageCount = 0;
        }
    }
    //为了确保还有剩余没有确认消息 再次确认
    if (outstandingMessageCount > 0) {
        channel.waitForConfirms();
    }
    long end = System.currentTimeMillis();
    System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms");
}

6.3异步确认发布

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说, 它是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功。

/**
 * 异步确认发布
 *
 */
public static void publishMessageByAsync() throws Exception {
    Channel channel = RabbitMqUtils.getChannel();

    String queueName = UUID.randomUUID().toString();
    // 生成一个队列
    channel.queueDeclare(queueName, false, false, false, null);

    // 开启发布确认
    channel.confirmSelect();

    long begin = System.currentTimeMillis();

    // 消息确认成功回调
    /**
     * 参数1。消息的标记
     * 参数2.是否为批量确认
     */
    ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
        System.out.println(String.format("确认的消息:" + deliveryTag));
    };

    // 消息确认失败回调
    /**
     * 参数1。消息的标记
     * 参数2.是否为批量确认
     */
    ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
        System.out.println(String.format("未确认的消息:" + deliveryTag));
    };

    // 添加异步监听器 监听消息是否成功
    /**
     * 参数1。监听哪些消息成功了
     * 参数2.监听哪些消息失败了
     */
    channel.addConfirmListener(ackCallback, nackCallback);

    for (int i = 0; i < MESSAGE_COUNT; i++) {
        String message = i + "";
        channel.basicPublish("", queueName, null, message.getBytes());
    }

    long end = System.currentTimeMillis();
    System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) + "ms");
}

处理异步未确认消息:

最好的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列, 比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

/**
  * 异步确认
  * 消息确认失败时的处理
  * @throws Exception
  */
public static void publishMessageByAsyncNackCallback() throws Exception {
    Channel channel = RabbitMqUtils.getChannel();

    String queueName = UUID.randomUUID().toString();
    // 生成一个队列
    channel.queueDeclare(queueName, false, false, false, null);

    // 开启发布确认
    channel.confirmSelect();

    /**
         * 线程安全有序的一个哈希表,适用于高并发的情况下  高并发下的hashMap
         * 1.轻松的将序号与消息进行关联
         * 2.可以轻松的批量的删除条目,只需要给到序号
         * 3.支持高并发(多线程)
         */
    ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();

    // 消息确认成功回调
    /**
         * 参数1。消息的标记
         * 参数2.是否为批量确认
         */
    ConfirmCallback ackCallback = (deliveryTag, multiple) -> {

        //判断是否进行批量操作
        if (multiple) {
            //2.删除已经确认的消息  剩下的就是确认失败的消息
            //headMap标记在头部
            ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag);
            confirmed.clear();
        } else {
            outstandingConfirms.remove(deliveryTag);
        }

        System.out.println("确认的消息:" + deliveryTag);
    };

    // 消息确认失败回调
    /**
         * 参数1。消息的标记
         * 参数2.是否为批量确认
         */
    ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
        //3.打印一下未确认的消息
        String message = outstandingConfirms.get(deliveryTag);
        System.out.println("未确认的消息是:" + message + ",未确认消息的tag是:" + deliveryTag);
    };

    // 添加异步监听器 监听消息是否成功
    /**
         * 参数1。监听哪些消息成功了
         * 参数2.监听哪些消息失败了
         */
    channel.addConfirmListener(ackCallback, nackCallback);

    //开始时间
    long begin = System.currentTimeMillis();

    for (int i = 0; i < MESSAGE_COUNT; i++) {
        String message = i + "";
        channel.basicPublish("", queueName, null, message.getBytes());

        //1.记录所有要发送的消息
        //getNextPublishSeqNo下一次发布的序号
        outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
    }

    long end = System.currentTimeMillis();
    System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) + "ms");
}

完整代码:

package com.youzi.rabbitmq.four;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.youzi.rabbitmq.utils.RabbitMqUtils;

import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

/**
 * @Author youzi
 * @ClassName ConfirmMessage
 * @Date 2022/7/4 10:34
 * @Description //TODO
 */
public class ConfirmMessage {

    public static final int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception {
        //单个发送  22240ms
        ConfirmMessage.publishMessageIndividually();

        //批量确认  311ms
        ConfirmMessage.publishMessageBatch();

        //异步确认  51ms
        ConfirmMessage.publishMessageByAsync();

        //异步确认加失败处理  49ms
        ConfirmMessage.publishMessageByAsyncNackCallback();
    }

    /**
     * 单个发送
     *
     * @throws Exception
     */
    public static void publishMessageIndividually() throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        String queueName = UUID.randomUUID().toString();

        channel.queueDeclare(queueName, true, false, false, null);

        //开启发布确认
        channel.confirmSelect();

        long begin = System.currentTimeMillis();

        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("", queueName, null, message.getBytes());
            //服务端返回 false 或超时时间内未返回,生产者可以消息重发
            //单个消息马上进行发布确认
            boolean flag = channel.waitForConfirms();
            if (flag) {
                System.out.println("消息发送成功");
            }
        }

        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");
    }


    /**
     * 批量确认
     *
     * @throws Exception
     */
    public static void publishMessageBatch() throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        String queueName = UUID.randomUUID().toString();

        channel.queueDeclare(queueName, true, false, false, null);

        //开启发布确认
        channel.confirmSelect();

        long begin = System.currentTimeMillis();

        //批量确认消息大小
        int bachSize = 100;

        //批量发布消息  批量确认
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("", queueName, null, message.getBytes());

            //判断达到100条消息的时候,批量确认一次
            if (i % bachSize == 0) {
                //发布确认
                channel.waitForConfirms();
            }
        }

        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms");
    }


    /**
     * 异步确认
     *
     * @throws Exception
     */
    public static void publishMessageByAsync() throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        String queueName = UUID.randomUUID().toString();
        // 生成一个队列
        channel.queueDeclare(queueName, false, false, false, null);

        // 开启发布确认
        channel.confirmSelect();

        long begin = System.currentTimeMillis();

        // 消息确认成功回调
        /**
         * 参数1。消息的标记
         * 参数2.是否为批量确认
         */
        ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
            System.out.println(String.format("确认的消息:" + deliveryTag));
        };

        // 消息确认失败回调
        /**
         * 参数1。消息的标记
         * 参数2.是否为批量确认
         */
        ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
            System.out.println(String.format("未确认的消息:" + deliveryTag));
        };

        // 添加异步监听器 监听消息是否成功
        /**
         * 参数1。监听哪些消息成功了
         * 参数2.监听哪些消息失败了
         */
        channel.addConfirmListener(ackCallback, nackCallback);

        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("", queueName, null, message.getBytes());
        }

        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) + "ms");
    }


    /**
     * 异步确认
     * 消息确认失败时的处理
     * @throws Exception
     */
    public static void publishMessageByAsyncNackCallback() throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        String queueName = UUID.randomUUID().toString();
        // 生成一个队列
        channel.queueDeclare(queueName, false, false, false, null);

        // 开启发布确认
        channel.confirmSelect();

        /**
         * 线程安全有序的一个哈希表,适用于高并发的情况下  高并发下的hashMap
         * 1.轻松的将序号与消息进行关联
         * 2.可以轻松的批量的删除条目,只需要给到序号
         * 3.支持高并发(多线程)
         */
        ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();

        // 消息确认成功回调
        /**
         * 参数1。消息的标记
         * 参数2.是否为批量确认
         */
        ConfirmCallback ackCallback = (deliveryTag, multiple) -> {

            //判断是否进行批量操作
            if (multiple) {
                //2.删除已经确认的消息  剩下的就是确认失败的消息
                //headMap标记在头部
                ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag);
                confirmed.clear();
            } else {
                outstandingConfirms.remove(deliveryTag);
            }

            System.out.println("确认的消息:" + deliveryTag);
        };

        // 消息确认失败回调
        /**
         * 参数1。消息的标记
         * 参数2.是否为批量确认
         */
        ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
            //3.打印一下未确认的消息
            String message = outstandingConfirms.get(deliveryTag);
            System.out.println("未确认的消息是:" + message + ",未确认消息的tag是:" + deliveryTag);
        };

        // 添加异步监听器 监听消息是否成功
        /**
         * 参数1。监听哪些消息成功了
         * 参数2.监听哪些消息失败了
         */
        channel.addConfirmListener(ackCallback, nackCallback);

        //开始时间
        long begin = System.currentTimeMillis();

        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("", queueName, null, message.getBytes());

            //1.记录所有要发送的消息
            //getNextPublishSeqNo下一次发布的序号
            outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
        }

        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) + "ms");
    }
}

以上三种发布确认对比

发布确认方案对比
单独发布确认消息同步等待确认,简单,但吞吐量非常有限
批量发布确认消息批量同步等待,简单,合理的吞吐量,出现问题无法确认是哪条消息异常
异步发布确认消息最佳性能和资源使用,可以异步处理成功和失败的消息

7、交换机

7.1、Fanout模式(发布/订阅模式)

概念:它是将接收到的所有消息广播到它知道的所有队列中。

生产者代码:

package com.youzi.rabbitmq.five;

import com.rabbitmq.client.Channel;
import com.youzi.rabbitmq.utils.RabbitMqUtils;

import java.util.Scanner;

/**
 * @Author youzi
 * @ClassName Emitlog
 * @Date 2022/7/4 16:15
 * @Description //TODO
 */
public class Emitlog {
    //交换机名称
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        Scanner scanner = new Scanner(System.in);

        while (scanner.hasNext()) {
            String message = scanner.next();
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println("生产者发出消息:" + message);
        }
    }
}

消费者1代码:

package com.youzi.rabbitmq.five;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.DeliverCallback;
import com.youzi.rabbitmq.utils.RabbitMqUtils;
import com.youzi.rabbitmq.utils.SleepUtils;

/**
 * @Author youzi
 * @ClassName ReceiveLogs01
 * @Date 2022/7/4 16:15
 * @Description //TODO 消费者1
 */
public class ReceiveLogs01 {

    //交换机名称
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        //声明一个临时队列
        /**
         * 生成一个临时队列,队列的名称是随机的
         * 当消费者断开与队列的连接时,队列就自动删除了
         */
        String queueName = channel.queueDeclare().getQueue();

        //绑定交换机与队列
        /**
         * 参数1.队列名称
         * 参数2.交换机名称
         * 参数3.routingKey值
         */
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println("等待接收消息,把接收到的消息放在屏幕上。。。。。。");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("ReceiveLogs01接收到的消息:" + new String(message.getBody()));
        };

        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

消费者2代码:

package com.youzi.rabbitmq.five;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.youzi.rabbitmq.utils.RabbitMqUtils;

/**
 * @Author youzi
 * @ClassName ReceiveLogs02
 * @Date 2022/7/4 16:15
 * @Description //TODO
 */
public class ReceiveLogs02 {

    //交换机名称
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        //声明一个临时队列
        /**
         * 生成一个临时队列,队列的名称是随机的
         * 当消费者断开与队列的连接时,队列就自动删除了
         */
        String queueName = channel.queueDeclare().getQueue();

        //绑定交换机与队列
        /**
         * 参数1.队列名称
         * 参数2.交换机名称
         * 参数3.routingKey值
         */
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println("等待接收消息,把接收到的消息放在屏幕上。。。。。。");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("ReceiveLogs02接收到的消息:" + new String(message.getBody()));
        };

        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

7.2、Direct 模式(路由模式)

概念:消息只去到它绑定的 routingKey 队列中去。

生产者代码:

package com.youzi.rabbitmq.six;
import com.rabbitmq.client.Channel;
import com.youzi.rabbitmq.utils.RabbitMqUtils;
import java.util.Scanner;

/**
 * @Author youzi
 * @ClassName DirectLogs
 * @Date 2022/7/4 17:23
 * @Description //TODO
 */
public class DirectLogs {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        Scanner scanner = new Scanner(System.in);

        while (scanner.hasNext()) {
            String message = scanner.next();
            //第二个参数可以填入info,warning,error,填入哪个就是哪个消费者接收
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println("生产者发出消息:" + message);
        }
    }
}

消费者1代码:

package com.youzi.rabbitmq.six;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.youzi.rabbitmq.utils.RabbitMqUtils;

/**
 * @Author youzi
 * @ClassName ReceiveLogsDirect01
 * @Date 2022/7/4 17:22
 * @Description //TODO
 */
public class ReceiveLogsDirect01 {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //队列声明
        channel.queueDeclare("console", false, false, false, null);
        //队列绑定
        channel.queueBind("console", EXCHANGE_NAME, "info");
        channel.queueBind("console", EXCHANGE_NAME, "warning");

        //发送回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("ReceiveLogsDirect01接收到的消息:" + new String(message.getBody()));
        };

        channel.basicConsume("console", true, deliverCallback, consumerTag -> {
        });
    }
}

消费者2代码:

package com.youzi.rabbitmq.six;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.youzi.rabbitmq.utils.RabbitMqUtils;

/**
 * @Author youzi
 * @ClassName ReceiveLogsDirect02
 * @Date 2022/7/4 17:22
 * @Description //TODO
 */
public class ReceiveLogsDirect02 {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //队列声明
        channel.queueDeclare("disk", false, false, false, null);
        //队列绑定
        channel.queueBind("disk", EXCHANGE_NAME, "error");

        //发送回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("ReceiveLogsDirect02接收到的消息:" + new String(message.getBody()));
        };

        channel.basicConsume("disk", true, deliverCallback, consumerTag -> {
        });
    }
}

7.3、Topic 模式(主题模式)

主题交换机是通过匹配路由键进行消息的转发,对于key匹配上的,转发相应队列,全都不匹配的直接丢弃;

规则:

  1. 所有的单词均是由 . 分隔;
  2. 可以匹配任意一个单词;
  3. 可以匹配0个或多个单词;

注意:

  1. 当一个队列的routeKey是#,那么这个队列将接收所有消息,类似于FANOUT;
  2. 当一个队列的routeKey没有包含#和*,那么这个队列类似于DIRECT;
routeKey匹配
*.hello.*匹配三个单词,且中间的必须是hello
welcome.*匹配两个单词,第一个必须是welcome
hello.world匹配两个单词,第一个是hello,第二个是world
#.happy匹配至少一个单词,且最后一个单词必须是happy
happy.*.#匹配至少两个单词,第一个必须是happy

生产者代码:

package com.youzi.rabbitmq.seven;
import com.rabbitmq.client.Channel;
import com.youzi.rabbitmq.utils.RabbitMqUtils;
import java.util.HashMap;
import java.util.Map;

public class EmitLogTopic {
    //交换机的名称
    public static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        /**
         * Q1-->绑定的是
         *      中间带 orange 带 3 个单词的字符串(*.orange.*)
         * Q2-->绑定的是
         *      最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)
         *      第一个单词是 lazy 的多个单词(lazy.#)
         */
        HashMap<String, String> bindingKeyMap = new HashMap<>();
        bindingKeyMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");
        bindingKeyMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");
        bindingKeyMap.put("quick.orange.fox", "被队列 Q1 接收到");
        bindingKeyMap.put("lazy.brown.fox", "被队列 Q2 接收到");
        bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
        bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
        bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
        bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");

        for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
            String bindingKey = bindingKeyEntry.getKey();
            String message = bindingKeyEntry.getValue();

            channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes("UTF-8"));
            System.out.println("生产者发出消息:" + message);
        }
    }
}

消费者代码1:

package com.youzi.rabbitmq.seven;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.youzi.rabbitmq.utils.RabbitMqUtils;

/**
 * @Author youzi
 * @ClassName ReceiveLogsTopic01
 * @Date 2022/7/5 14:11
 * @Description //TODO
 */
public class ReceiveLogsTopic01 {
    //交换机名称
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMqUtils.getChannel();

        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        //声明队列
        String queueName = "Q1";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
        System.out.println("等待接收消息....");

        //发送回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("ReceiveLogsTopic01接收到的消息:" + new String(message.getBody()));
            System.out.println("接收队列:" + queueName + "绑定的键:" + message.getEnvelope().getRoutingKey());
        };

        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });

    }
}

消费者代码2:

package com.youzi.rabbitmq.seven;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.youzi.rabbitmq.utils.RabbitMqUtils;

/**
 * @Author youzi
 * @ClassName ReceiveLogsTopic01
 * @Date 2022/7/5 14:11
 * @Description //TODO
 */
public class ReceiveLogsTopic02 {
    //交换机名称
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMqUtils.getChannel();

        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        //声明队列
        String queueName = "Q2";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
        channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
        System.out.println("等待接收消息....");

        //发送回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("ReceiveLogsTopic02接收到的消息:" + new String(message.getBody()));
            System.out.println("接收队列:" + queueName + "绑定的键:" + message.getEnvelope().getRoutingKey());
        };

        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });

    }
}

8、死信队列

死信就是无法被消费的消息;当生产者把消息投递到broker或者直接到队列中,消费者从队列中取出消息进行消费,但由于特殊情况,队列中的个别消息无法被消费,这样的消息如果没有后续的处理,就会变成死信,存放在死信队列中;

通常进入死信队列的消息要么是异常消息,要么是超时消息;

死信队列的来源:

  1. 消息TTL时间过期;
  2. 队列达到最大长度,无法添加新的消息到mq中;
  3. 消息被拒绝(basic.reject或者nack)并且requeue=false;

8.1、消息TTL时间过期

场景模拟:声明两个直接交换机和队列,逻辑交换机绑定逻辑队列,死信交换机绑定死信队列,然后生产者发送消息到逻辑交换机,由于逻辑交换机接收超时,导致消息进入死信队列,被消费者2处理;

声明TTL时间过期

  1. 在Consumer中可以queueDeclare队列声明处增加参数map,使用keyx-message-ttl设置;(不推荐)
  2. 在生产者发送消息的时候,通过BasicProperties设置发送消息的TTL,推荐使用,不同消息可以设置不同的消息TTL;

生产者代码:

package com.youzi.rabbitmq.eight;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.youzi.rabbitmq.utils.RabbitMqUtils;

/**
 * @Author youzi
 * @ClassName Producer
 * @Date 2022/7/11 14:27
 * @Description //TODO
 */
public class Producer {
    //正常的交换机
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //死信消息 设置TTL时间(过期时间) 单位是ms
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

        //发送消息
        for (int i = 1; i < 11; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
        }

    }
}

消费者1代码:

package com.youzi.rabbitmq.eight;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.youzi.rabbitmq.utils.RabbitMqUtils;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author youzi
 * @ClassName Consumer01
 * @Date 2022/7/11 10:53
 * @Description //TODO 死信队列
 */
public class Consumer01 {

    //正常的交换机
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信的交换机
    public static final String DEAD_EXCHANGE = "dead_exchange";
    //正常的队列
    public static final String NORMAL_QUEUE = "normal_queue";
    //死信的交换机
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //声明正常和死信交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        //声明正常队列
        Map<String, Object> arg = new HashMap<>();
        //过期时间 单位ms 10s = 10000
        //(两个地方可以设置,建议在生产者设置,因为消费者这边设置的是固定值,不可更改,消费者可以设置每条消息的过期时间)
        arg.put("x-message-ttl", 10000);
        //正常交换机设置死信交换机
        arg.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //设置死信RoutingKey
        arg.put("x-dead-letter-routing-key", "lisi");
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, arg);

        //声明死信队列
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);

        //普通队列绑定普通交换机
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
        //死信队列绑定死信交换机
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");

        System.out.println("等待接收消息.....");

        //发送回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("Consumer01接受的消息是:" + new String(message.getBody()));
        };

        channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> {
        });
    }
}

消费者2代码:

package com.youzi.rabbitmq.eight;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.youzi.rabbitmq.utils.RabbitMqUtils;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author youzi
 * @ClassName Consumer01
 * @Date 2022/7/11 10:53
 * @Description //TODO 死信队列
 */
public class Consumer02 {
    //死信的交换机
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        System.out.println("等待接收消息.....");

        //发送回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("Consumer02接受的消息是:" + new String(message.getBody()));
        };

        channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag -> {
        });
    }
}

8.2、队列达到最大长度,无法添加新的消息到MQ

操作:在声明logic逻辑队列的参数中,增加key值为x-max-length;标识队列最大消息长度;

先要把原来的normal_queue队列删除,否则启动会报错,代码与上面的差不多,只需要把生产者的超时时间删除

// 在消费者1的代码中,参数map增加设置超时时间的key即可
// 声明队列最大长度6条消息
arg.put("x-max-length", 6);

8.3、消息被拒绝

消费者1需要手动确认消息,把确认消息设置成reject;则该消息进入死信队列;

消费者1代码:

package com.youzi.rabbitmq.eight;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.youzi.rabbitmq.utils.RabbitMqUtils;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author youzi
 * @ClassName Consumer01
 * @Date 2022/7/11 10:53
 * @Description //TODO 死信队列
 */
public class Consumer01 {

    //正常的交换机
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信的交换机
    public static final String DEAD_EXCHANGE = "dead_exchange";
    //正常的队列
    public static final String NORMAL_QUEUE = "normal_queue";
    //死信的交换机
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //声明正常和死信交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        //声明正常队列
        Map<String, Object> arg = new HashMap<>();
        //过期时间 单位ms 10s = 10000
        //(两个地方可以设置,建议在生产者设置,因为消费者这边设置的是固定值,不可更改,消费者可以设置每条消息的过期时间)
        //arg.put("x-message-ttl", 10000);
        //正常交换机设置死信交换机
        arg.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //设置死信RoutingKey
        arg.put("x-dead-letter-routing-key", "lisi");
        //声明队列最大长度6条消息
        //arg.put("x-max-length", 6);
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, arg);

        //声明死信队列
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);

        //普通队列绑定普通交换机
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
        //死信队列绑定死信交换机
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");

        System.out.println("等待接收消息.....");

        //发送回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String msg = new String(message.getBody());
            if (msg.equals("info5")) {
                System.out.println("Consumer01接受的消息是:" + msg + ":此消息是被C1拒绝的");
                //false表示不塞回队列,即进入死信队列
                channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
            } else {
                System.out.println("Consumer01接受的消息是:" + msg);
                //false指的是不批量应答
                channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            }

        };

        //开启手动应答
        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {
        });
    }
}

消费者2和生产者代码不变

评论

暂无

添加新评论