介绍(from wikipedia)

定义:RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件,服务端用Erlang编写的。
可伸缩性:集群服务
消息持久化:从内存持久化消息到硬盘,再从硬盘加载到内存

安装

https://www.rabbitmq.com/documentation.html

概念

RabbitMQ

  • Broker:服务器实体,如192.168.10.1:5672,localhost:5672
  • Producer:消息的生产者,产生的消息Publish到RabbitMQ的Server
  • Consumer:消息的消费者,从RabbitMQ的Server消费Message
  • Message:消息,由Delivery Mode、Headers、Properties、Body
    组成,Delivery Mode分为Persistent和Non-Persistent;Header为自定义键值对,name可以为任意字符串;Properties值为枚举,如priority优先级、content_type传输格式、expiration、content_encoding、timestamp等高级特性;Body为消息体内容(Payload)。消息有3种状态,Ready待消费、Unacked待应答、Total总数。
  • Exchange:交换器,接收Producer发送的Message,并路由Route到队列Queue。Exchange类型(分发策略)有Direct、fanout、topic和headers(不常用),Direct为通过routing-key完全匹配;fanout为广播模式,会转发消息到所有绑定的queue上,不做过滤;topic为模式匹配,通过通配符#(任意个单词)和通配符*(匹配1个单词)进行匹配
  • Routing key:exchange通过Routing key以及路由规则决定将消息路由到某个queue
  • Queue:队列,用来保存(buffer)消息的容器直到被消费者消费
  • Vhost:virtual host虚拟主机,默认为”/“,Vhost可以理解为”完全隔离”,不同的用户可以对不同的vhost拥有不同的权限
  • Binding: 绑定,用于queue和exchange关联,多对多的关系
  • Channel:信道,虚拟连接,当发布或使用队列中的Message时通过Channel完成

代码

dependency:

1
2
3
4
5
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.0</version>
</dependency>

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
final String QUEUE_NAME = "queue_name_demo";
ConnectionFactory factory = new ConnectionFactory();
/*
* 设置连接信息,也可以通过factory.setHost等设置
*/
factory.setUri("amqp://guest:guest@localhost:5672");

Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
/*
* 声明一个queue
* 如果队列不存在且没有Declare时,Publish的message将丢失
* 如果队列不存在但Declare队列时,会创建对应的queue
* 如果队列已存在但Declare参数与服务端不一致也会报错
* 入参分别为:
* queue – queue名
* durable - 是否持久化队列
* exclusive - 独占式队列
* autoDelete - 服务器将在不再使用时将其删除
* arguments - 其他额外参数,如无则传null
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "这里是消息内容";
/*
* 发送消息
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
channel.close();
connection.close();

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

final Channel channel = connection.createChannel();
// 入参以实际的为准,第2个参数为Boolean类型
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 消费回调函数,处理业务逻辑
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
System.out.println("接收消息 : " + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, consumer);
/*
* channel和connection在数据消费完之后再关闭,测试时可以注释掉下面2行
*/
//channel.close();
//connection.close();