【java】RabbitMQ-路由模式RoutingKey

2022-09-11 13:18:34

生产者发送消息到交换机并指定一个路由key,消费者队列绑定到交换机时要制定路由key(key匹配就能接受消息,key不匹配就不能接受消息)

例如:我们可以把路由key设置为insert ,那么消费者队列key指定包含insert才可以接收消息,消费者队列key定义为update或者delete就不能接收消息。很好的控制了更新,插入和删除的操作。

采用交换机direct模式

生产者代码如下:

package com.rabbitmqdemo.Producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmqdemo.Utils.MQConnectionUtils;
import java.util.concurrent.TimeoutException;

import java.io.IOException;

public class DirectProducter {
    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建新的连接
        Connection connection = MQConnectionUtils.newConnection();
        // 2.创建通道
        Channel channel = connection.createChannel();
        // 3.绑定的交换机 参数1交互机名称 参数2 exchange类型
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String routingKey = "info";
        String msg = "direct_exchange_msg" + routingKey;
        // 4.发送消息
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
        System.out.println("生产者发送msg:" + msg);
        // // 5.关闭通道、连接
        // channel.close();
        // connection.close();
        // 注意:如果消费没有绑定交换机和队列,则消息会丢失

    }
}

短信消费者代码:

package com.rabbitmqdemo.consumer;

import com.rabbitmq.client.*;
import com.rabbitmqdemo.Utils.MQConnectionUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DirectSMSConsumer {
    private static final String QUEUE_NAME = "consumer_direct_sms";
    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("短信消费者启动");
        // 1.创建新的连接
        Connection connection = MQConnectionUtils.newConnection();
        // 2.创建通道
        Channel channel = connection.createChannel();
        // 3.消费者关联队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消费者获取生产者消息:" + msg);
            }
        };
        // 5.消费者监听队列消息
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

邮件消费者代码:

package com.rabbitmqdemo.consumer;

import com.rabbitmq.client.*;
import com.rabbitmqdemo.Utils.MQConnectionUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DirectEmailConsumer {
    private static final String QUEUE_NAME = "consumer_direct_email";
    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("邮件消费者启动");
        // 1.创建新的连接
        Connection connection = MQConnectionUtils.newConnection();
        // 2.创建通道
        Channel channel = connection.createChannel();
        // 3.消费者关联队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消费者获取生产者消息:" + msg);
            }
        };
        // 5.消费者监听队列消息
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}
  • 作者:不死鸟.亚历山大.狼崽子
  • 原文链接:https://blog.csdn.net/u013938578/article/details/108412213
    更新时间:2022-09-11 13:18:34