基础概念

RabbitMQ 是一个消息代理:它接受和转发消息。用户可以将通过RabbitMQ传递消息,类比成邮政系统,在这个系统中RabbitMQ充当邮箱(投递消息)、邮局(消息路由)和信件载体(消息载体)的角色。

在RabbitMQ的消息模型中涉及一下概念:

  • Producer:生产者,发送消息message到RabbitMQ Broker(部署MQ服务的节点)
  • Consumer:消费者,从MQ服务获取特定消息并处理
  • Queue:消息队列,存在在MQ节点上的消息缓存,占用节点的内存和磁盘资源
  • Exchange:消息路由器

在RabbitMQ的messaging model中,Producer从不直接向Queue发送任何消息,只能将数据发到exchange中。exchange的类型决定如何将message发送到queue。

Exchange包括以下四种类型:

  • fanout:将message广播到绑定该exchange的所有queue,发送到该exchange的message忽略routing_key值

  • direct:将message发送(广播)到binding key和routing key一致的queue中,对于一对exchange和queue用户可以指定多个binding key

默认exchange实际上是特殊的direct exchange,默认exchange不需要进行binding操作,当Producer指定的exchange为'‘时,message发到默认exchange。每一个新创建的Queue都是用其名称自动bind到该exchange

  • topic:基于字符串匹配的dispatch方式,Producer将Routing key定义为多个特征词连接的字符串,而Consumer将根据自己感兴趣的特征词定义Binding Key。topic exchange的匹配规则为 * 匹配一个任意特征词,# 匹配一个或者多个特征词。

    上述示例中,lazy.orange.elephant将被发送到Q1和Q2,lazy.pink.rabbit将被发送到Q2而不会被发送到Q1。当特征词的个数不匹配时message会被丢弃,如quick.orange.male.rabbit

    当Binding Key中不使用*和#字符时,topic 类型和direct等价

  • headers:该类型的Exchange忽略routing key,基于attribute来路由消息

参考RabbitMQ的基础概念:https://www.rabbitmq.com/tutorials/amqp-concepts.html

示例

1.日志系统

Producer代码,运行时根据传入的参数发送一条消息,其中第一个删除表示routing_key

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
#!/usr/bin/env python
import pika
import sys

auth = pika.PlainCredentials(username="default_user_buOS998-Ga7ngE0s4gd", password="VGBC_2CC5_HVziWh1qPOAYZ5Z8Cazz3e")
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host="localhost", port=31564, credentials=auth)))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
    exchange='topic_logs', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

Consumer代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#!/usr/bin/env python
import pika
import sys

auth = pika.PlainCredentials(username="default_user_buOS998-Ga7ngE0s4gd", password="VGBC_2CC5_HVziWh1qPOAYZ5Z8Cazz3e")
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host="localhost", port=31564, credentials=auth)))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

# 创建独占的Queue,exclusive=True表示退出时删除Queue
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(
        exchange='topic_logs', queue=queue_name, routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

参考:https://www.rabbitmq.com/tutorials/tutorial-five-python.html

2.RPC调用

参考:https://www.rabbitmq.com/tutorials/tutorial-six-python.html

FAQ

1.共用Queue

多个Consumer绑定到相同Queue,默认情况下,RabbitMQ 将按顺序将每条message发送给某个消费者,每个消费者会得到数量大致相同的message,即round-robin模式分发消息。 通过channel.basic_qos(prefetch_count=1)参数可以保证不会向正在处理数据的Consumer发送消息,即公平调度(Fair dispatch)

2. 数据持久

默认情况下Consumer需要主动进行acknowledgments,且强制acknowledgement时间为30min。用户可以指定auto_ack为true,RabbitMQ将message传递给Consumer,它立即将其标记为删除,即”至少送达一次“。

生产环境中,创建queue时需要指定durable=True,防止broker退出时queue消失,并且当Producer发送数据时,也需要指定delivery_mode保证message是持久化的。

在pika中可以参考以下代码:

1
2
3
4
5
6
7
channel.queue_declare(queue='task_queue', durable=True)
channel.basic_publish(exchange='',
                  routing_key="task_queue",
                  body=message,
                  properties=pika.BasicProperties(
                     delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE
                  ))

由于RabbitMQ在接收Message时有短暂的时间窗口持久化数据,因此上述方式并不能100%保证Queue的可靠,用户需要publisher confirms机制来保证这一点。

publisher confirms 参考:https://www.rabbitmq.com/tutorials/tutorial-seven-java.html

参考

官方文档