RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

  • A+
所属分类:RabbitMQ

文章目录

参考文献:
官方文档(英文)链接:https://www.rabbitmq.com/tutorials/tutorial-one-java.html
RabbitMQ简介 英文教程(半小时):https://www.youtube.com/results?search_query=rabbitmq
尚硅谷文档(中文):链接:https://pan.baidu.com/s/1xhh5b02mC9FeOlgKkGCyvg 提取码:t8oh

看到没解释的参数不要着急,先不用管他,后面慢慢都会解释的,因为需要一些铺垫~

零、 RabbitMQ安装

1. 在官网下载rabbitmq-server

官网链接:https://www.rabbitmq.com/download.html

2. 在GitHub上下载erlang

GitHub链接:https://github.com/rabbitmq/erlang-rpm

3. 将文件上传至你的服务器or虚拟机的/usr/local/software目录下

如果没有/software,自己建立一个:

mkdir /usr/local/software

4. 安装文件

rpm -ivh erlang-21.3-1.el7.x86_64.rpm  # 改成你自己的版本号
yum install socat -y
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm # 改成你自己的版本号

5. 常用命令

chkconfig rabbitmq-server on		# 添加开机启动RabbitMQ服务【配置时只需要这两条】
/sbin/service rabbitmq-server start # 启动服务【配置时只需要这两条】
/sbin/service rabbitmq-server status# 查看服务状态(绿色Active running就是已启动)
/sbin/service rabbitmq-server stop	# 停止服务

rabbitmqctl stop_app				# 关闭应用
rabbitmqctl reset					# 全部清除
rabbitmqctl start_app				# 重新启动

6. 开启web管理插件

rabbitmq-plugins enable rabbitmq_management

如果是云服务器,请在阿里云(或其他云)官网+宝塔(如有)放行你的5672和15672端口!不要轻易关闭防火墙!

默认账号为:guest,默认密码为:guest
但是只有本机才能用这个账号密码登陆,在其他机器访问服务器会出现User can only log in via localhost错误。
因此还需要配置一个新用户:

rabbitmqctl add_user 用户名 密码					# 添加新用户,密码请用强密码!
rabbitmqctl set_user_tags 用户名 administrator	# 设置用户角色
rabbitmqctl set_permissions -p "/" 用户名 ".*" ".*" ".*"	# 设置用户权限,这里表示该用户具有对/vhost1下所有资源的配置+读写权限。set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
rabbitmqctl list_users							# 查看当前用户及其角色

一、快速开始 Hello World

官方文档链接:https://www.rabbitmq.com/tutorials/tutorial-one-java.html

1.1 RabbitMQ 简介

RabbitMQ(MQ:Message Queue)

  • 是一个消息代理(Message Broker),用于接收和转发消息。【接收 -> 存储 -> 转发二进制数据】
  • 生产Producing:发送消息,发送消息的程序称为“生产者”。
    RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)
  • 消费Consuming:接受消息,接受消息的程序称为“消费者”。
    RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)
  • 队列Queue:消息队列是RabbitMQ里的“邮箱”,本质上是一个消息缓冲区buffer。生产者发送消息给队列,消息存储在队列中,转发到消费者处。队列只受到主机内存和磁盘大小的限制。
    RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

1.2 示例程序:Hello World

实现:生产者发送一条程序,消费者接收消息并打印出来。
RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

1. 建立Java Maven工程,在pom.xml中添加以下配置

(By the way,我用的是JDK8)

  • Note:如果amqp-client找不到,检查一下你的网络设置(是不是用了公司内网),可以用手机热点试试。
    <dependencies>
<!-- rabbitmq 依赖客户端-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>
<!-- 操作文件流的一个依赖-->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>
<!-- slf4j日志-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.36</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.10</version>
        </dependency>
<!-- junit测试-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
        </dependency>
    </dependencies>

2. 发送消息

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

public class Send {
   
  private final static String QUEUE_NAME = "hello";
  public static void main(String[] argv) throws Exception {
   
  	// 工厂模式,建立连接
    ConnectionFactory factory = new ConnectionFactory();
    // 如果你是在主机上测试,只需要这一条
	// factory.setHost("localhost");
	// 如果是在本地访问服务器来测试,需要配置账号密码。
	// 也可以写一个properties文件来读取信息,后面还会集成进Utils里,这里先直接写死测试一下能不能通
    factory.setHost("xxx.xxx.xxx.xxx(这是服务器ip地址)");
    factory.setUsername("在安装时设置的管理员用户名");
    factory.setPassword("在安装时设置的管理员密码");
    
    // 建立连接和管道
    // 使用try()语句,connection和channel都实现了java.io.Closeable,所以不用显式地.close()关掉连接
	try (Connection connection = factory.newConnection();
	     Channel channel = connection.createChannel()) {
   
	     	// 参数一:声明我们要发送的队列是谁(QUEUE_NAME),其他参数这里先不用关注
		    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		    // 发送消息
			String message = "Hello World!";
			channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
			System.out.println(" [x] 发送消息 '" + message + "'");
	}
  }
}

channel.queueDeclare(param 1, param2, param3, param4, param5)
生成一个队列
1.队列名称
2.队列里面的消息是否持久化 默认消息存储在内存中
3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
5.其他参数

channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
发送一个消息
1.发送到哪个交换机
2.路由的 key 是哪个
3.其他的参数信息
4.发送消息的消息体

3. 接收消息

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)
生产者发送完就结束进程了,消费者则会一直监听消息。

public class Recv {
   
  private final static String QUEUE_NAME = "hello";
  
  public static void main(String[] argv) throws Exception {
   
  //======连接配置与Send一样,后面会写进Utils里========//
  	// 工厂模式,建立连接
    ConnectionFactory factory = new ConnectionFactory();
    // 如果你是在主机上测试,只需要这一条
	// factory.setHost("localhost");
	// 如果是在本地访问服务器来测试,需要配置账号密码。
	// 也可以写一个properties文件来读取信息,后面还会集成进Utils里,这里先直接写死测试一下能不能通
    factory.setHost("xxx.xxx.xxx.xxx(这是服务器ip地址)");
    factory.setUsername("在安装时设置的管理员用户名");
    factory.setPassword("在安装时设置的管理员密码");
    
    // 建立连接和管道
    // 这里不用try()包裹起建立的语句,原因是:
    // 我们的目的是不断监听消息,如果用try直接收到一条就close了,则不能达到监听的效果
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    // 声明从哪个队列接受消息
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] 等待消息. To exit press CTRL+C");
    
    // 接收到信息回调接口,目的是当接收到一条信息时,进行一些操作,比如可以在控制台里打印出来,以告诉程序员收到了信息。
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
   
	    String message = new String(delivery.getBody(), "UTF-8");
	    System.out.println(" [x] 已收到 '" + message + "'");
    };
    // 取消接收的回调接口,目的是如在接收消息的时候队列被删除掉了,可以进行一些操作,例如告诉程序员接收被中断了。
    CancelCallback cancelCallback=(consumerTag) -> {
   
    	System.out.println("消息消费被中断"); 
    };
    
    // 管道接收消息
	channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
	}
}

4. 运行代码

  1. 先在终端把rabbitmq打开
    RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)
  2. 在idea里,先跑Revc再跑Send
    也可以通过命令行操作,在terminal里:
javac -cp amqp-client-5.8.0.jar Send.java Recv.java # 编译
java -cp .:amqp-client-5.8.0.jar:slf4j-api-1.7.36.jar Recv # 运行Recv
java -cp .:amqp-client-5.8.0.jar:slf4j-api-1.7.36.jar Send # 运行Send

结果:
RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)


二、 工作队列/任务队列 Work Queues / Task Queues

官方文档链接:https://www.rabbitmq.com/tutorials/tutorial-two-java.html

2.1 简介

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

工作队列的核心思想:

  • 对于生产者(产生消息的人):避免必须立刻执行“资源紧张”的任务。
  • 对于消息队列:生产者想要做的“任务”会被封装成一个消息放在队列里。
  • 对于消费者(处理任务的人):当你有多个“工人”时,这些任务会被轮询分配给不同的工人。

这个思想也被应用于那些需要处理不能在一个很短的HTTP请求窗口期间完成的复杂任务的网页程序中。

2.2 示例程序

0. 把建立connection和channel的过程写在一个工具类里

建立一个Utils包,撰写一个RabbitUtil工具类。

public class RabbitUtil {
   
    public static Channel getChannel() throws Exception {
   
        // 引入配置文件
        Properties properties = new Properties();
        InputStream inputStream = new FileInputStream(new File("src/main/resources/rabbit-user.properties"));
        properties.load(inputStream);
        String host = properties.getProperty("rabbit.host");
        String username = properties.getProperty("rabbit.username");
        String password = properties.getProperty("rabbit.password");

        // 连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);

        // 建立连接和信道
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

在resources下,建立一个properties文件写入rabbitmq的配置信息。

rabbit.host=写入你的主机ip
rabbit.username=写入你的rabbitmq管理员名称
rabbit.password=写入你的rabbitmq管理员密码

1. 发送

我们用Thread.sleep()来模拟在现实中需要很长时间的复杂任务,打多少个“.”代表这个任务有多复杂。

基于上一节的Send.java,我们做一些修改:

public class NewTask {
   
  private final static String QUEUE_NAME = "hello";
  public static void main(String[] argv) throws Exception {
   
  	// 建立连接和管道
    try (Channel channel = RabbitUtil.getChannel()){
   
   		// 参数一:声明我们要发送的队列是谁(QUEUE_NAME),其他参数这里先不用关注
	    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
	    // 发送消息
	    // 如果你是用shell测试的,用这条语句,用于放入参数:
		// String message = String.join(" ", argv); // ***主要改了这里***
		// 如果你是在idea里用控制台测试的,用这条语句:
		String message = new Scanner(System.in).nextLine();
		channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
		System.out.println(" [x] 发送消息 '" + message + "'");
    }
  }
}

2.接收

接收程序需要做的修改:

  1. 模拟发送不同数量的"."来代表任务复杂度。
  2. 接收到消息后开始做任务

在接收程序上,我们做一些修改:

public class Worker {
   
  private final static String QUEUE_NAME = "hello";
  
  public static void main(String[] argv) throws Exception {
   
	  	// 建立连接和管道
	  	Channel channel = RabbitUtil.getChannel();
	    // 声明从哪个队列接受消息
	    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
	    System.out.println(" [*] 等待消息. To exit press CTRL+C");
	    
	    // 接收到信息回调接口,目的是当接收到一条信息时,进行一些操作,比如可以在控制台里打印出来,以告诉程序员收到了信息。
	    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
   
		    String message = new String(delivery.getBody(), "UTF-8");
		    System.out.println(" [x] 已收到 '" + message + "'");
		    // ***主要改了这里***
		    try{
   
		    	doWork(message);
		    } catch (InterruptedException e) {
   
                e.printStackTrace();
            } finally {
   
		    	System.out.println(" [x] 工作任务完成!");
		    }
	    };
	    // 取消接收的回调接口,目的是如在接收消息的时候队列被删除掉了,可以进行一些操作,例如告诉程序员接收被中断了。
	    CancelCallback cancelCallback=(consumerTag) -> {
   
	    	System.out.println("消息消费被中断"); 
	    };
	    // 这个参数后面会说, 详见2.3
	    boolean autoAck = true;
	    
	    // 管道接收消息
		channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
	}
	
	// 用于模拟工作任务,输入一个点就停顿一秒
	private static void doWork(String task) throws InterruptedException {
   
	    for (char ch: task.toCharArray()) {
   
	        if (ch == '.') Thread.sleep(1000);
	    }
	}
}

运行代码

在shell里运行的方法同上一章。

先配置让idea可以运行多个实例。(我的idea版本是2021.3,没有Allow Parallel run选项了,老版本idea请参考文章
RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)
RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

测试结果

发送消息
RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)
RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

处理消息

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

2.3 消息应答

1. 轮流调度 Round-robin dispatching

用工作队列的一个好处是可以轻松地进行并行工作。如果我们积攒了很多任务没做,这时只需要多加几个工人,可以很轻松地扩大处理规模scale

默认情况下,RabbitMQ会把消息按顺序传给下一个消费者。平均来看,每个消费者拿到的信息数量都是相同的。这种分发信息的机制被称为轮流调度(轮询,round-robin)。

2. 消息确认 Message Acknowledgements

在我们现在写的这份代码里,RabbitMQ一把信息转发给消费者(工人)就会马上把这个任务在队列里删掉。
而完成一个任务需要一定的时间,那如果一个工人在做某项任务期间突然被打断了,我们就会丢失这个任务信息。不仅如此,我们还会丢掉所有交给这个工人但他还没完成的任务。

如果你不想让信息丢失,我们就要开启RabbitMQ的信息确认功能。消费者在接收到并处理完一个任务后,会给RabbitMQ发一个确认信息(Acknowledgement, ACK),告诉他任务已经完成了,可以删掉了。如果消费者没完成任务就死掉了(例如管道关闭了、连接丢失了、TCP连接断掉了),一段时间后RabbitMQ没收到确认信息ACK,就会知道给他的消息没有被处理,从而把这个消息再放进队列里,并让其他消费者去处理。
RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

默认情况下,RabbitMQ会等30分钟。你也可以用rabbitmq.conf中的参数consumer_timeout自定义超时时间。(点此查看官方文档解释

默认情况下,自动应答功能是打开的。我们刚才的代码里boolean autoAck = true;把这个功能关掉了。

接下来,我们来测试自动应答功能,autoAck改为:boolean autoAck = false;
结束掉一个在工作途中的worker进程,看一下最终的效果,消息会被重新分配给其他worker。

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

注意:这里的信息确认ACK必须从收到信息的channel发回去。否则会出现channel-level protocol exception

3. 消息持久化

如果RabbitMQ服务器挂掉了,消息也是会丢失的,除非你将队列和消息进行持久化(写入磁盘)。

第一步,修改队列声明的参数:

	boolean durable = true;
	channel.queueDeclare("task_queue", durable, false, false, null);

注意,如果已经声明并使用了一个队列,那么不可以修改他的参数,只能重新换一个队列名称(生产者和消费者代码中的队列名称都要改)。

第二步,需要标记我们的消息是持久化的:

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

注意,虽然我们标记了消息是需要持久化的,但RabbitMQ接收到消息->持久化到磁盘仍然需要一定时间,这就意味着消息可能在缓存里,依然有丢失的可能。不过对于简单的任务队列这也够用了,如果还需要更强的保证消息不丢失,则需要使用“发布者确认”publisher confirms。【见2.4】

4. 公平分配

假设我们有两个工人,按顺序分配任务,如果奇数的任务很重偶数的任务很轻松,就会出现有一个工人累的要死,另一个却很闲的情况。任务量分配不均的原因是:RabbitMQ没有看每个工人完成的工作量(即,收到的ACK数)。

为了解决这个问题,可以使用basicQos(Channel Prefetch Setting)方法,即当工人做完一个任务再给他下一个,不要一次性给多个任务。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

综合代码

综上四个问题,我们修改代码如下:

生产者:

public class NewTask {
   
    private final static String QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
   
        // 建立连接和管道
        Channel channel = RabbitUtil.getChannel();
        // 参数一:声明我们要发送的队列是谁(QUEUE_NAME),其他参数这里先不用关注
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 发送消息
        String message = new Scanner(System.in).nextLine();
// String message = String.join(" ", argv); // ***主要改了这里***
        channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
        System.out.println(" [x] 发送消息 '" + message + "'");
    }
}

消费者:

public class Worker {
   
    private final static String QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
   
        // 建立连接和管道
        Channel channel = RabbitUtil.getChannel();
        // 声明从哪个队列接受消息
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] 等待消息. To exit press CTRL+C");

        // 平均分配
        channel.basicQos(1);

        // 接收到信息回调接口,目的是当接收到一条信息时,进行一些操作,比如可以在控制台里打印出来,以告诉程序员收到了信息。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
   
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] 已收到 '" + message + "'");
            try {
   
                doWork(message);
            } catch (InterruptedException e) {
   
                e.printStackTrace();
            } finally {
   
                System.out.println(" [x] 工作任务完成!");
                // 使用basicAck方法
                /** * 消息应答的方法 * A.Channel.basicAck(用于肯定确认) * RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了 * B.Channel.basicNack(用于否定确认) * C.Channel.basicReject(用于否定确认) * 与 Channel.basicNack 相比少一个参数 不处理该消息了直接拒绝,可以将其丢弃了 * * 第一个参数 * 获取发送内容的标签 * * 第二个参数(见下图) * multiple 的 true 和 false 代表不同意思 * true 代表批量应答【 channel 上未应答的消息】 * 比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 * 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答 * false 同上面相比只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答 * */
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
            }
        };
        // 取消接收的回调接口,目的是如在接收消息的时候队列被删除掉了,可以进行一些操作,例如告诉程序员接收被中断了。
        CancelCallback cancelCallback = (consumerTag) -> {
   
            System.out.println("消息消费被中断");
        };

        // 手动应答,应答方式见basicAck
        // 手动应答的好处是可以批量应答并且减少网络拥堵
        boolean autoAck = false;

        // 管道接收消息
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
    }

    // 用于模拟工作任务,输入一个点就停顿一秒
    private static void doWork(String task) throws InterruptedException {
   
        for (char ch : task.toCharArray()) {
   
            if (ch == '.') Thread.sleep(1000);
        }
    }
}

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

Forgotten acknowledgement
有一个常见的错误是忽略basicAck。当客户退出时,信息看起来是被随机地重新交付了,但RabbitMQ会吃掉越来越多的内存,因为它不能释放任何还没确认的信息。
如何debug这种错误:
sudo rabbitmqctl list_queues name messages_ready
messages_unacknowledged


2.4 发布确认

官方文档链接

生产者将channel设置成 confirm 模式,一旦channel进入 confirm 模式,所有在该channel上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认ACK给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了。

如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ackmultiple 域,表示到这个序列号之前的所有消息都已经得到了处理。

confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息。

如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息, 生产者应用程序同样可以在回调方法中处理该nack消息。

1. 开启发布确认

发布确认默认是关闭的,如果要开启需要调用方法 confirmSelect。

channel.confirmSelect();

2. 单个确认发布

发布一个消息之后只有它被确认发布,后续的消息才能继续发布。
waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。

**缺点: **发布速度特别慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。

    public static void publishMessageIndividually() throws Exception {
   
        try (Channel channel = RabbitMqUtils.getChannel()) {
   
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName, false, false, false, null); 
            //开启发布确认
            channel.confirmSelect();
            long begin = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
   
                String message = i + "";
                channel.basicPublish("", queueName, null, message.getBytes());
				
				//服务端返回 false 或超时时间内未返回,生产者可以消息重发
                boolean flag = channel.waitForConfirms();
                if (flag) {
   
                    System.out.println("消息发送成功");
                }
            }
            long end = System.currentTimeMillis();
            System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");
        }
    }

3. 批量确认发布

与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量。

**缺点: ** 当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。这种方案仍然是同步的,也一样阻塞消息的发布。

    public static void publishMessageBatch() throws Exception {
   
        try (Channel channel = RabbitMqUtils.getChannel()) {
   
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName, false, false, false, null); 
            //开启发布确认
            channel.confirmSelect();
			//批量确认消息大小
            int batchSize = 100; 
            //未确认消息个数
            int outstandingMessageCount = 0;
            long begin = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
   
                String message = i + "";
                channel.basicPublish("", queueName, null, message.getBytes());
                outstandingMessageCount++;
                if (outstandingMessageCount == batchSize) {
   
                    channel.waitForConfirms();
                    outstandingMessageCount = 0;
                }
            }
			//为了确保还有剩余没有确认消息 再次确认 
			if (outstandingMessageCount > 0)
            {
   
                channel.waitForConfirms();
            }
            long end = System.currentTimeMillis();
            System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms");
        }
    }

4. 异步确认

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都更好。

异步确认是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功。
RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

    public static void publishMessageAsync() throws Exception {
   
        try (Channel channel = RabbitMqUtils.getChannel()) {
   
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName, false, false, false, null); //开启发布确认
            channel.confirmSelect();
            /** * 线程安全有序的一个哈希表,适用于高并发的情况 * 1.轻松的将序号与消息进行关联 * 2.轻松批量删除条目 只要给到序列号 * 3.支持并发访问 */
            ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<> ();
            /** * 确认收到消息的一个回调 * 1.消息序列号 * 2.true可以确认小于等于当前序列号的消息 * false 确认当前序列号消息 */
            ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
   
                if (multiple) {
   
					//返回的是小于等于当前序列号的未确认消息 是一个 map
                    ConcurrentNavigableMap<Long,String> confirmed = outstandingConfirms.headMap (sequenceNumber, true);
					//清除该部分未确认消息 
					confirmed.clear();
                } else {
   
					//只清除当前序列号的消息 
					outstandingConfirms.remove(sequenceNumber);
                }
            }; ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
   
                String message = outstandingConfirms.get(sequenceNumber);
                System.out.println("发布的消息" + message + "未被确认,序列号" + sequenceNumber);
            };
            /** * 添加一个异步确认的监听器 * 1.确认收到消息的回调 * 2.未收到消息的回调 */
            channel.addConfirmListener(ackCallback, null);
            long begin = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
   
                String message = "消息" + i;
                /** * channel.getNextPublishSeqNo()获取下一个消息的序列号 * 通过序列号与消息体进行一个关联 * 全部都是未确认的消息体 */
                outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
                channel.basicPublish("", queueName, null, message.getBytes());
            }
            long end = System.currentTimeMillis();
            System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) + "ms");
        }
    }

如何处理异步未确认信息?
最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,
比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

5. 三种发布速度的对比

  • 单独发布消息
    同步等待确认,简单,但吞吐量非常有限。
  • 批量发布消息
    批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是哪条消息出现了问题。
  • 异步处理
    最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些

三、发布/订阅 Publish / Subscribe

官方文档链接:https://www.rabbitmq.com/tutorials/tutorial-three-java.html

  • 工作队列模式:一个消息给一个接收者
  • 发布订阅模式:一个消息给多个接收者

案例说明

建立一个日志记录系统:

  1. 一个程序发送日志消息
  2. 另一个程序接收消息并打印
  • 每一个接收程序都会收到日志,那我们就可以让一个接收者把日志持久化到磁盘,另一个接收者把日志打印出来。
  • 每个发布的日志消息都会被广播给所有接收者。

3.1 交换机 Exchanges, X

RabbitMQ消息机制的核心思想是:生产者不直接把消息发给队列(他甚至不知道消息会被发给哪个队列),而是把消息发给交换机。

交换机 会知道要把这个消息发给哪个/哪些队列或丢弃。-> 使用exchange type来声明(exchange type包括directtopicheadersfanout

/** * logs是这次交换的名称 * fanout:广播,把收到的信息发给所有的接收者 **/
channel.exchangeDeclare("logs", "fanout"); 

Tip: 查看交换方式的命令:
sudo rabbitmqctl list_exchanges

在之前写工作队列时,我们没有指定交换方式,却也发送成功了信息,是因为我们是用了匿名交换 (Nameless exchange),也就是默认交换。
channel.basicPublish("", "hello", null, message.getBytes()); 这里的""就是是用了默认交换方式:消息会发送给在routingKey里查到的对应的queue。

由此,我们可以以广播形式发布对应的信息了,即

channel.basicPublish("logs", "", null, message.getBytes());

【后续有合并的代码】

3.2 临时队列 Temporary Queues

我们的日志记录系统需要监听所有的日志消息,而不是只是一小部分。另外,我们只关注现在的消息,而不是过时的消息。因此,我们需要完成两件事:

  1. 任何时候我们连接到Rabbit时,他会给我们全新的空队列,并生成随机队列名。
  2. 断开连接时,队列会自动删除。

我们用以下语句,可以生成一个不持久化的、特有的、自动删除的队列:

String queueName = channel.queueDeclare().getQueue();

特有的 Exclusive:used by only one connection and the queue will be deleted when that connection closes
文档Link:https://www.rabbitmq.com/queues.html

3.3 绑定 Bindings

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

我们已经创建了一种扇出fanout交换方式和一个队列,接下俩我们要让交换机把消息传给队列,这个关系就叫做绑定binding

channel.queueBind(queueName, "logs", "");

列出所有的绑定:rabbitmqctl list_bindings

3.4 案例代码

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

代码整体和前文差别不大,主要在于定义了“logs”交换方式。

发送者:

public class EmitLog {
   
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception{
   
        Channel channel = RabbitUtil.getChannel();
        // 声明交换名称和方式
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

        String message = new Scanner(System.in).nextLine();

        channel.basicPublish(EXCHANGE_NAME, "",null,message.getBytes(StandardCharsets.UTF_8));

        System.out.println(" [x] 发送信息 '" + message + "'");
        channel.close();
        channel.getConnection().close();
    }
}

接收者:

public class ReceiveLogs {
   
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {
   
        // 获得一个channel
        Channel channel = RabbitUtil.getChannel();
        // 声明交换模式
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 获得队列名称
        String queueName = channel.queueDeclare().getQueue();
        // 绑定队列和交换机
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] 等待信息. To exit press CTRL+C");

        // 收到消息的回调接口
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
   
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] 收到信息 '" + message + "'");
        };
        // 取消发送的回调接口
        CancelCallback cancelCallback = (consumerTag) -> {
   
            System.out.println("消息消费被中断");
        };
        // 接收信息
        channel.basicConsume(queueName,true,deliverCallback, cancelCallback);
    }
}

结果

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

四、路由 Routing(Direct模式)

  • 工作队列模式:一个消息给一个接收者
  • 发布订阅模式:一个消息给多个接收者
  • 路由模式:接收者接收一部分信息

4.1 绑定 Bindings

复习一下上文创建绑定的方式:

channel.queueBind(queueName, EXCHANGE_NAME, "");

这里的"" 实际上是路由绑定键routingKey参数。

channel.queueBind(queueName, EXCHANGE_NAME, "black");

4.2 直接交换方式 Direct Exchange

在第三章中的日志记录系统中,我们做一些改进:只把一部分重要的信息写进磁盘,但仍然打印所有的日志信息。

与上文使用fanout模式不同,这里我们使用direct交换模式。这种模式将消息发送给对应的队列,这个队列和交换机的绑定键binding key和这条消息的路由键routing key是匹配的。

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

如果现在有一条消息的路由键routing key是“orange”,那么他会被发给Q1 队列。

4.3 多重绑定 Multiple Bindings

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

你也可以给交换机和多个队列用同一个键绑定。

4.4 日志系统代码

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

对于发送者

创建一个直接交换方式的交换机:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

我们用log的严重程度作为路由键,如 info / warning / error

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

对于接收者

我们用log的严重程度作为绑定键:

String queueName = channel.queueDeclare().getQueue();

String[] severities = new String[]{
   "log", "warning", "error"};
for(String severity : severities){
   
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

合并代码

生产者

public class EmitLogDirect {
   
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
   
        try (Channel channel = RabbitUtil.getChannel()) {
   
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
			//创建多个 bindingKey
            Map<String, String> bindingKeyMap = new HashMap<>();
            bindingKeyMap.put("info", "普通 info 信息");
            bindingKeyMap.put("warning", "警告 warning 信息");
            bindingKeyMap.put("error", "错误 error 信息"); 
            //debug 没有消费这接收这个消息 就丢失了
            bindingKeyMap.put("debug", "调试 debug 信息");
            for (Map.Entry<String, String> bindingKeyEntry :
                    bindingKeyMap.entrySet()) {
   
                String bindingKey = bindingKeyEntry.getKey();
                String message = bindingKeyEntry.getValue();
                channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes(StandardCharsets.UTF_8));
                System.out.println("生产者发出消息:" + message);
            }
        }
    }
}

消费者

  • 一部分写入磁盘
public class ReceiveLogsDirectSaveToDisk {
   
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
   
        Channel channel = RabbitUtil.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String queueName = "disk";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, EXCHANGE_NAME, "error");
        System.out.println("等待接收消息........... ");
        
        // 收到消息的回调接口,将日志写入磁盘
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
   
            String message = new String(delivery.getBody(), "UTF-8");
            message = "接收绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message;
            File file = new File("./rabbitmq_info.txt");
            FileUtils.writeStringToFile(file, message, "UTF-8");
            System.out.println("错误日志已经接收");
        };
        // 取消发送的回调接口
        CancelCallback cancelCallback = (consumerTag) -> {
   
            System.out.println("消息消费被中断");
        };
        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
    }
}
  • 一部分直接打印
public class ReceiveLogsDirectPrintOut {
   
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
   
        Channel channel = RabbitUtil.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String queueName = "console";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, EXCHANGE_NAME, "info");
        channel.queueBind(queueName, EXCHANGE_NAME, "warning");
        System.out.println("等待接收消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
   
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" 接收绑定键 :" + delivery.getEnvelope().getRoutingKey() + ", 消息:" + message);
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
   });
    }
}

结果

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

再看看rabbitMQ管理系统:
RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

五、主题模式Topics

  • 工作队列模式:一个消息给一个接收者
  • 发布订阅模式(fanout):一个消息给多个接收者
  • 路由模式(direct):接收者接收一部分信息
  • 主题模式(topics):区分发送主体

之前我们的日志系统实现了根据不同信息传给不同的队列,现在我们需要对信息进一步筛选。例如,在Unix系统中,log可能有info/warn/crit的情况,这些log可能是从auth/cron/kern..传送来的,那么如果我们需要区分发送log的主体,仅接受来自cron的critical errors,就需要用到topic交换方式。

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

  • 路由键 routing key
    发送给topic交换模式的交换机 的消息 不能用随意的routing_key,它的路由键必须是一系列用"."隔开的词语,例如quick.orange.rabbit / stock.usd.nyse。词语的数量可以随便你,但是总长度不能超过255字节
  • 绑定键 binding key
    绑定键和路由键是同一个格式,消息会被发送给能和它路由键匹配的绑定键线路。没有match的消息就会被丢掉。比如,*.orange.* / *.*.rabbit / quick.orange.rabbit.#
    • 星号 "*":代替一个词
    • 井号 "#":代替零个或多个词

当队列的绑定键都是 #,topic exchange就和fanout exchange是一样的。
当队列的绑定键没有*#时,topic exchange就和direct exchange是一样的。

示例代码

生产者

public class EmitLogTopic {
   
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {
   
        // 建立连接
        Channel channel = RabbitUtil.getChannel();

        // 声明topic交换模式的交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        Map<String, String> bindingKeyMap = new HashMap<>();
        bindingKeyMap.put("quick.range.rabbit", "被队列Q1Q2接收到");
        bindingKeyMap.put("lazy.orange.elephant", "被队列Q1Q2接收到");
        bindingKeyMap.put("quick.orange.fox", "被队列Q1接收到");
        bindingKeyMap.put("lazy.brown.fox", "虽然满足两个绑定但只被队列Q2接收一次");
        bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
        bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
        bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
        bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");

        Iterator<Map.Entry<String, String>> iterator =
                bindingKeyMap.entrySet().iterator();
        while (iterator.hasNext()){
   
            Map.Entry<String, String> next = iterator.next();
            String bindingKey = next.getKey();
            String message = next.getValue();

            channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者发出消息: " + bindingKey + "---> " + message);
        }

        channel.close();
        channel.getConnection().close();
    }
}

消费者Q1

public class ReveiveLogsTopicQ1 {
   
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args)throws Exception{
   
        // 建立channel
        Channel channel = RabbitUtil.getChannel();

        // 声明交换
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        // 声明Q1队列与绑定关系
        String queueName = "Q1";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, EXCHANGE_NAME,"*.orange.*");

        System.out.println("等待接收消息。。匹配模式为\"*.orange.*\"");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
   
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println("接收队列:" + queueName +
                    " --> 路由键:" + delivery.getEnvelope().getRoutingKey() +
                    " -- 消息:" + message);
        };
        CancelCallback cancelCallback = (consumerTag) -> {
   
            System.out.println("接收失败。。");
        };
        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
    }
}

消费者Q2

public class ReveiveLogsTopicQ2 {
   
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {
   
        // 建立channel
        Channel channel = RabbitUtil.getChannel();

        // 声明交换
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        // 声明Q1队列与绑定关系
        String queueName = "Q2";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
        channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");

        System.out.println("等待接收消息。。匹配模式为:\"*.*.rabbit\"或\"lazy.#\"");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
   
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println("接收队列:" + queueName +
                    " --> 路由键:" + delivery.getEnvelope().getRoutingKey() +
                    " -- 消息:" + message);
        };
        CancelCallback cancelCallback = (consumerTag) -> {
   
            System.out.println("接收失败。。");
        };
        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
    }
}

结果

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

六、死信队列

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

死信:由于某些原因(消息TTL过期、队列达到最大长度、消息被拒绝)导致队列中的消息无法被处理。
RabbitMQ死信队列机制:当消息消费发生异常时,将消息投入死信队列。(例如,用户下单成功但未在指定时间内支付 -> 消息自动失效)

代码模拟死信三种情况

6.1 消息TTL过期

代码结构图见上图

生产者

public class Producer {
   
    private static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws Exception{
   
        // 获取连接
        Channel channel = RabbitUtil.getChannel();
        // 建立一个direct模式的交换
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 设置消息的TTL时间
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

        // 该消息是用作演示队列的个数限制
        for (int i = 0; i < 11; i++) {
   
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan",properties,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者发送信息" + message);
        }

        channel.close();
        channel.getConnection().close();
    }
}

消费者

消费者1,处理正常队列中的信息

public class Consumer01 {
   
    private final static String NORMAL_EXCHANGE = "normal_exchange";
    private final static String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception{
   
        // 建立channel
        Channel channel = RabbitUtil.getChannel();

        // 声明死信和普通交换机,类型为direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        // 声明死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        // 死信队列绑定死信交换机与routingKey
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

        // 正常队列绑定死信队列信息
        Map<String, Object> params = new HashMap<>();
        // 正常队列设置死信交换机,key是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 正常队列设置死信routing-key,key是固定值
        params.put("x-dead-letter-routing-key", "lisi");

        String normalQueue = "normal_queue";
        // 将设置死信的参数params放进正常队列声明中
        channel.queueDeclare(normalQueue,false,false, false,params);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");

        System.out.println("等待接收信息。。。");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
   
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println("Consumer01 接收到信息: " + message);
        };
        CancelCallback cancelCallback = (consumerTag) -> {
   
            System.out.println("接收失败");
        };

        channel.basicConsume(normalQueue, true, deliverCallback, cancelCallback);
    }
}

消费者2,处理死信队列中的信息

public class Consumer02 {
   
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception {
   
        // 建立channel
        Channel channel = RabbitUtil.getChannel();
        // 声明交换机
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        // 死信队列声明及绑定交换机
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

        System.out.println("等待接收死信队列信息。。。。");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
   
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println("Consumer02 接收到死信队列中的信息: " + message);
        };
        CancelCallback cancelCallback = (consumerTag) -> {
   
            System.out.println("接收失败");
        };

        channel.basicConsume(deadQueue, true, deliverCallback, cancelCallback);
    }
}

结果

(C1需要启动完先关闭,再打开生产者)
RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

此时再打开死信队列,死信队列里的消息被C2消费。
RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)
RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

6.2 队列达到最大长度

在6.1代码中修改两处地方:

  1. 去掉生产者代码中的TTL语句
  2. 在C1消费者代码中添加 param.put("x-max-length", 6),设置正常队列的长度限制。

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

6.3 消息被拒

在6.2代码的基础上,修改C1消费者代码(生产者和C2消费者不变):

  1. 改为手动应答,修改DeliverCallback
  2. 删除param.put("x-max-length", 6)
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
   
            String message = new String(delivery.getBody(), "UTF-8");
            if (message.equals("info5")) {
   
                System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
                //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
                channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
            } else {
   
                System.out.println("Consumer01 接收到消息" + message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };

        boolean autoAck = false;

        channel.basicConsume(normalQueue, autoAck, deliverCallback, cancelCallback);

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

七、延迟队列

延迟队列:队列中的元素需要在指定时间取出和处理。例如,用户发起订单,十分钟内未支付则自动取消。

当数据量很大时,采取轮询的方式显然是不合理的,会给数据库带来很大压力。

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

7.1 RabbitMQ中的TTL

TTL,最大存活时间,表明消息或该队列中所有消息的最大存活时间。
有两种方式设置:

  1. 针对每条信息设置TTL
rabbitTemplate.convertAndSend("X", "XC", message, correlationData -> {
   
			correlationData.getMessageProperties().setExpiration(ttlTime);
			return correlationData;
})
  1. 在创建队列时设置队列的x-message-ttl属性
params.put("x-message-ttl", 5000);
return QueueBuilder.durable(QUEUE_A).withArguments(params).build();

如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中)。
而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的。
如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

延时队列核心 = 死信队列 + TTL:TTL让消息延迟多久后成为死信,消费者一直处理死信队列里的信息就行。

7.2 整合SpringBoot

1. 添加依赖

Springboot版本:2.6.7
JDK:8

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

<!-- rabbitMQ依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.76</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-boot-starter</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>25.1-jre</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>

2. 修改配置文件 application.properties

spring.rabbitmq.host=你的主机ip
spring.rabbitmq.port=5672
spring.rabbitmq.username=你的rabbit用户名
spring.rabbitmq.password=你的rabbit密码

3. 添加swagger配置类

建立一个config包,SwaggerConfig类。

@Configuration
@EnableSwagger2
public class SwaggerConfig {
   

    @Bean
    public Docket webApiConfig() {
   
        return new Docket(DocumentationType.SWAGGER_2)
                .groupName("webApi")
                .apiInfo(webApiInfo())
                .select()
                .build();
    }

    private ApiInfo webApiInfo() {
   
        return new ApiInfoBuilder()
                .title("RabbitMQ 接口文档")
                .description("本文档描述了rabbitmq微服务接口定义")
                .version("1.0")
                .contact(new Contact("cherry", "http://xxxx.github.io/", "xxxx@qq.com"))
                .build();
    }
}

7.3 队列TTL

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

创建两个队列QA和QB,两者队列TTL分别设置为10秒和40秒,然后再创建一个交换机X和死信交换机Y,它们的类型都是direct,创建一个死信队列QD。

根据以上架构图,配置队列、交换机、绑定。

@Configuration
public class TtlQueueConfig {
   
    public static final String X_EXCHANGE = "X";
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    public static final String DEAD_LETTER_QUEUE = "QD";
    
    // 声明 XExchange
    @Bean("xExchange")
    public DirectExchange xExchange(){
   
        return new DirectExchange(X_EXCHANGE);
    }
    // 声明 YExchange
    @Bean("yExchange")
    public DirectExchange yExchange(){
   
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }
    
    // 声明队列A,ttl为10s,绑定到对应的死信交换机
    @Bean("queueA")
    public Queue queueA(){
   
        Map<String, Object> args = new HashMap<>(3);
        // 声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        // 声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key","YD");
        // 声明队列的TTL
        args.put("x-message-ttl", 10000);
        
        return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
    }
    // 声明队列A绑定X交换机
    @Bean
    public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
                                  @Qualifier("xExchange") DirectExchange xExchange){
   
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    // 声明队列B,ttl为40s,绑定到对应的死信交换机
    @Bean("queueB")
    public Queue queueB(){
   
        Map<String, Object> args = new HashMap<>(3);
        // 声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        // 声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key","YD");
        // 声明队列的TTL
        args.put("x-message-ttl", 40000);
        
        return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
    }
    // 声明队列B绑定X交换机
    @Bean
    public Binding queuebBindingX(@Qualifier("queueB") Queue queueB, 
                                  @Qualifier("xExchange") DirectExchange xExchange){
   
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }
    
    // 声明死信队列QD
    @Bean("queueD")
    public Queue queueD(){
   
        return new Queue(DEAD_LETTER_QUEUE);
    }
    // 声明死信队列和Y交换机的绑定关系
    @Bean
    public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
                                        @Qualifier("yExchange") DirectExchange yExchange){
   
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}

测试

发起一个请求: http://localhost:8080/ttl/sendMsg/HelloCherry~

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

7.4 延时队列优化

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

配置文件代码

@Configuration
public class MsgTtlQueueConfig {
   
    private static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    private static final String QUEUE_C = "QC";

    // 声明队列C 死信交换机
    @Bean("queueC")
    public Queue queueB(){
   
        Map<String, Object> args = new HashMap<>(3);
        // 声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        // 声明当前队列的死信路由
        args.put("x-dead-letter-routing-key","YD");
        // 没有声明TTL属性
        return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
    }

    // 声明队列B 绑定 X 交换机
    @Bean
    public Binding queueBindingX(@Qualifier("queueC") Queue queueC,
                                 @Qualifier("xExchange")DirectExchange xExchange){
   
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }
}

消息生产者代码

    @GetMapping("sendExpirationMsg/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message, @PathVariable String ttlTime){
   
        rabbitTemplate.convertAndSend("X", "XC", message, correlationData ->{
   
            correlationData.getMessageProperties().setExpiration(ttlTime);
            return correlationData;
        });
        log.info("当前时间:{}, 发送一条时长{}毫秒TTL消息给队列C:{}", new Date(), ttlTime, message);
    }

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列。

如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。

7.5 RabbitMQ插件实现延迟队列

下载插件,上传到/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/plugins
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制
消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。

配置类文件代码

@Configuration
public class DelayedQueueConfig {
   
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

    @Bean
    public Queue delayedQueue(){
   
        return new Queue(DELAYED_QUEUE_NAME);
    }

    @Bean
    public CustomExchange delayedExchange(){
   
        // 自定义交换机类型
        Map<String, Object> args = new HashMap<>(3);
        args.put("x-delayed-type","direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message", true, false, args);
    }

    @Bean
    public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
                                       @Qualifier("delayedExchange") CustomExchange delayedExchange){
   
        return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}
  • 消息生产者代码
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

    @GetMapping("sendDelayMsg/{message}/{delayTime}")
    public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime){
   
        rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message, correlationData ->{
   
            correlationData.getMessageProperties().setDelay(delayTime);
            return correlationData;
        });
        log.info("当前时间:{},发送一条延迟 {} 毫秒的信息给队列 delayed.queue:{}", new Date(), delayTime, message);
    }
  • 消息消费者代码
@Component
@Slf4j
public class ConsumerController {
   
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    @RabbitListener(queues = DELAYED_QUEUE_NAME)
    public void receiveDelayedQueue(Message message){
   
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到延时队列的信息:{}",new Date(), msg);
    }
}

发起请求:
http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000
http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000
RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。
另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。

当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景。

w3cjava

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: