介绍(from wikipedia)
定义:RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件,服务端用Erlang编写的。
可伸缩性:集群服务
消息持久化:从内存持久化消息到硬盘,再从硬盘加载到内存
安装
https://www.rabbitmq.com/documentation.html
概念

- Broker:服务器实体,如192.168.10.1:5672,localhost:5672
- Producer:消息的生产者,产生的消息Publish到RabbitMQ的Server
- Consumer:消息的消费者,从RabbitMQ的Server消费Message
- Message:消息,由Delivery Mode、Headers、Properties、Body
组成,Delivery Mode
分为Persistent和Non-Persistent;Header
为自定义键值对,name可以为任意字符串;Properties
值为枚举,如priority优先级、content_type传输格式、expiration、content_encoding、timestamp等高级特性;Body
为消息体内容(Payload)。消息有3种状态,Ready待消费、Unacked待应答、Total总数。
- Exchange:交换器,接收Producer发送的Message,并路由Route到队列Queue。Exchange类型(分发策略)有Direct、fanout、topic和headers(不常用),Direct为通过routing-key完全匹配;fanout为广播模式,会转发消息到所有绑定的queue上,不做过滤;topic为模式匹配,通过通配符#(任意个单词)和通配符*(匹配1个单词)进行匹配
- Routing key:exchange通过Routing key以及路由规则决定将消息路由到某个queue
- Queue:队列,用来保存(buffer)消息的容器直到被消费者消费
- Vhost:virtual host虚拟主机,默认为”/“,Vhost可以理解为”完全隔离”,不同的用户可以对不同的vhost拥有不同的权限
- Binding: 绑定,用于queue和exchange关联,多对多的关系
- Channel:信道,虚拟连接,当发布或使用队列中的Message时通过Channel完成
代码
dependency:
1 2 3 4 5
| <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.0</version> </dependency>
|
生产者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; final String QUEUE_NAME = "queue_name_demo"; ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://guest:guest@localhost:5672");
Connection connection = factory.newConnection(); final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "这里是消息内容";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); channel.close(); connection.close();
|
消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope;
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收消息 : " + new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, consumer);
|