上一节中使用了消息路由,消费者可以选择性的接收消息。 但是这样还是不够灵活。
比如某个消费者要订阅娱乐新闻消息 。 包括新浪、网易、腾讯的娱乐新闻。那么消费者就需要绑定三次,分别绑定这三个网站的消息类型。 如果新闻门户更多了,那么消费者将要绑定个更多的消息类型, 其实消费者只是需要订阅娱乐新闻,不管是哪个网站的新闻,都需要。 那么在rabbitMQ中可以使用topic类型。 模糊匹配消息类型。
模糊匹配中的 *代表一个 #代表零个或多个
示例:
1 package com.zf.rabbitmq06; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.Connection; 7 import com.rabbitmq.client.ConnectionFactory; 8 import com.rabbitmq.client.ConsumerCancelledException; 9 import com.rabbitmq.client.QueueingConsumer;10 import com.rabbitmq.client.QueueingConsumer.Delivery;11 import com.rabbitmq.client.ShutdownSignalException;12 13 /**14 * 接收消息15 * @author zhoufeng16 *17 */18 public class Recv06_01 {19 20 public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {21 22 ConnectionFactory connFac = new ConnectionFactory() ;23 24 connFac.setHost("127.0.0.1");25 26 Connection conn = connFac.newConnection() ;27 28 Channel channel = conn.createChannel() ;29 30 31 String exchangeName = "exchange03";32 33 channel.exchangeDeclare(exchangeName, "topic") ;34 35 String queueName = channel.queueDeclare().getQueue() ;36 37 //第三个参数就是type,这里表示只接收type01类型的消息。38 channel.queueBind(queueName, exchangeName, "#.type01") ;39 40 41 //配置好获取消息的方式42 QueueingConsumer consumer = new QueueingConsumer(channel) ;43 channel.basicConsume(queueName, true, consumer) ;44 45 //循环获取消息46 while(true){47 48 //获取消息,如果没有消息,这一步将会一直阻塞49 Delivery delivery = consumer.nextDelivery() ;50 51 String msg = new String(delivery.getBody()) ; 52 53 System.out.println("received message[" + msg + "] from " + exchangeName);54 }55 56 }57 58 }
1 package com.zf.rabbitmq06; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.Connection; 7 import com.rabbitmq.client.ConnectionFactory; 8 9 /**10 * 发送消息11 * @author zhoufeng12 *13 */14 public class Sender06 {15 16 public static void main(String[] args) throws IOException {17 18 ConnectionFactory connFac = new ConnectionFactory() ;19 20 //RabbitMQ-Server安装在本机,所以直接用127.0.0.121 connFac.setHost("127.0.0.1");22 23 //创建一个连接24 Connection conn = connFac.newConnection() ;25 26 //创建一个渠道27 Channel channel = conn.createChannel() ;28 29 String exchangeName = "exchange03";30 31 String messageType = "fs.type01";32 33 channel.exchangeDeclare(exchangeName, "topic") ;34 35 //定义Queue名36 String msg = "Hello World!";37 38 //发送消息39 channel.basicPublish( exchangeName , messageType , null , msg.getBytes());40 41 System.out.println("send message[" + msg + "] to "+ exchangeName +" success!");42 43 channel.close(); 44 conn.close(); 45 46 }47 48 }
使用topic之后 。不管Sender端发送的消息类型是fs.type01 还是 xx.type01 还是 type01 ,消费者都会收到消息