生产者发送消息到交换机并指定一个路由key,消费者队列绑定到交换机时要制定路由key(key匹配就能接受消息,key不匹配就不能接受消息)
例如:我们可以把路由key设置为insert ,那么消费者队列key指定包含insert才可以接收消息,消费者队列key定义为update或者delete就不能接收消息。很好的控制了更新,插入和删除的操作。
采用交换机direct模式
生产者代码如下:
package com.rabbitmqdemo.Producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmqdemo.Utils.MQConnectionUtils;
import java.util.concurrent.TimeoutException;
import java.io.IOException;
public class DirectProducter {
private static final String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建新的连接
Connection connection = MQConnectionUtils.newConnection();
// 2.创建通道
Channel channel = connection.createChannel();
// 3.绑定的交换机 参数1交互机名称 参数2 exchange类型
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String routingKey = "info";
String msg = "direct_exchange_msg" + routingKey;
// 4.发送消息
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
System.out.println("生产者发送msg:" + msg);
// // 5.关闭通道、连接
// channel.close();
// connection.close();
// 注意:如果消费没有绑定交换机和队列,则消息会丢失
}
}
短信消费者代码:
package com.rabbitmqdemo.consumer;
import com.rabbitmq.client.*;
import com.rabbitmqdemo.Utils.MQConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DirectSMSConsumer {
private static final String QUEUE_NAME = "consumer_direct_sms";
private static final String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("短信消费者启动");
// 1.创建新的连接
Connection connection = MQConnectionUtils.newConnection();
// 2.创建通道
Channel channel = connection.createChannel();
// 3.消费者关联队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消费者获取生产者消息:" + msg);
}
};
// 5.消费者监听队列消息
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
邮件消费者代码:
package com.rabbitmqdemo.consumer;
import com.rabbitmq.client.*;
import com.rabbitmqdemo.Utils.MQConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DirectEmailConsumer {
private static final String QUEUE_NAME = "consumer_direct_email";
private static final String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("邮件消费者启动");
// 1.创建新的连接
Connection connection = MQConnectionUtils.newConnection();
// 2.创建通道
Channel channel = connection.createChannel();
// 3.消费者关联队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消费者获取生产者消息:" + msg);
}
};
// 5.消费者监听队列消息
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}