1、RabbitMQ的工作队列和路由 工作队列:Working Queue 工作队列这个概念与简单的发送/接收消息的区别就是:接收方接收到消息后,可能需要花费更长的时间来处理消息,这个过程就叫一个Work/Task。 几个概念 分配:多个接收端接收同一个Queue时,如何分配? 消息确认:Server端如何确定接收方的Work已经对消息进行了完整的处理? 消息持久化:发送方、服务端Queue如何对未处理的消息进行磁盘持久化? Round-robin分配 多个接收端接收同一个Queue时,采用了Round-robin分配算法,即轮叫调度——依次分配给各个接收方。 消息确认
2、 默认开启了消息确认(接收方接收到消息后,立即向服务器发回确认)。消息接收方处理完消息后,向服务器发送消息确认,服务器再删除该消息。 对于耗时的work,可以先关闭自动消息确认,在work完成后,再手动发回确认。 channel.basicConsume("hello",false/*关闭自动消息确认*/,consumer); // ...work完成后 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 持久化 1. Server端的Queue持久化 注意的是,如果已经声明了同名非持久化的Q
3、ueue,则再次声明无效。 发送方和接收方都需要指定该参数。 boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null); 2. Message持久化 channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes()); 负载分配 为了解决各个接收端工作量相差太大的问题(有的一直busy,有的空闲比较多),突破Round-
4、robin。 int prefetchCount = 1; channel.basicQos(prefetchCount); 意思为,最多为当前接收方发送一条消息。如果接收方还未处理完毕消息,还没有回发确认,就不要再给他分配消息了,应该把当前消息分配给其它空闲接收方。 固定关键词路由:Routing 使用类型为direct的exchange,发送特定关键词(RoutingKey)的消息给订阅该关键词的Queue。 场景示例:消息发送方发送了类型为[error][info]的两种消息,写磁盘的消息接受者只接受error类型的消息,Console打印的接收两者。 (上
5、图采用了不同颜色来作为routingKey) 发送方 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"/*exchange类型为direct*/); channel.basi
6、cPublish(EXCHANGE_NAME, "info"/*关键词=info*/, null, message.getBytes()); channel.close(); connection.close(); 接收方 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
7、channel.exchangeDeclare(EXCHANGE_NAME, "direct"/*exchange类型为direct*/); // 创建匿名Queue String queueName = channel.queueDeclare().getQueue(); // 订阅某个关键词,绑定到匿名Queue中 channel.queueBind(quueName,EXCHANGE_NAME,"error"); channel.queueBind(quueName,EXCHANGE_NAME,"info"); QueueingConsumer consumer = n
8、ew QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); QueueingConsumer.Delivery delivery = consumer.nextDelivery(); // Blocking... String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); // 可获取路由关键词 关键词模式路由:Topics
9、 这种模式可以看做对Routing的扩展。Routing只能使用固定关键词,而Topics模式可以订阅模糊关键词。 关键词必须是一组word,由点号分割。例如"xxx.yyy.zzz",限定255bytes。 * 表示一个word; # 表示0个或者多个word; 发送方 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channe
10、l = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"/*exchange类型*/); channel.basicPublish(EXCHANGE_NAME, "xxx.yyy"/*关键词routingKey*/, null, message.getBytes()); channel.close(); connection.close(); 接收方 ConnectionFactory factory = new ConnectionFactory(); factory.se
11、tHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"/*exchange类型*/); // 创建匿名Queue String queueName = channel.queueDeclare().getQueue(); // 订阅某个关键词,绑定到匿名Queue中 channel.queueBind(quueNam
12、e,EXCHANGE_NAME,"*.yyy"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); QueueingConsumer.Delivery delivery = consumer.nextDelivery(); // Blocking... String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); // 可获取路由关键词






