Python操作RabbitMQ
RabbitMQ 介绍
RabbitMQ 是一个由 erlang 开发的 AMQP(Advanced Message Queue )的开源实现的产品,RabbitMQ 是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即 message 发送者以下简称 P,相对应的“消费者”乃 message 接收者以下简称 C,message 通过 queue 由 P 到 C,queue 存在于 RabbitMQ,可存储尽可能多的 message,多个 P 可向同一 queue 发送 message,多个 C 可从同一个 queue 接收 message
-
内部架构:

-
说明
- Message (消息):RabbitMQ 转发的二进制对象,包括 Headers(头)、Properties (属性)和 Data (数据),其中数据部分不是必要的。Producer(生产者): 消息的生产者,负责产生消息并把消息发到交换机
-
Exhange 的应用。
- Consumer (消费者):使用队列 Queue 从 Exchange 中获取消息的应用。
- Exchange (交换机):负责接收生产者的消息并把它转到到合适的队列
-
Queue (队列):一个存储 Exchange 发来的消息的缓冲,并将消息主动发送给 Consumer,或者 Consumer 主动来获取消息。见 1.4 部分的描述。
-
Binding (绑定):队列 和 交换机 之间的关系。Exchange 根据消息的属性和 Binding 的属性来转发消息。绑定的一个重要属性是 binding_key。
-
Connection (连接)和 Channel (通道):生产者和消费者需要和 RabbitMQ 建立 TCP 连接。一些应用需要多个 connection,为了节省 TCP 连接,可以使用 Channel,它可以被认为是一种轻型的共享 TCP 连接的连接。连接需要用户认证,并且支持 TLS (SSL)。连接需要显式关闭。
Python 操作 RabbitMQ
1. 实现简单消息队列:
一个 Product 向 queue 发送一个 message,一个 Client 从该 queue 接收 message 并打印
- 发消息 product
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1', port=5672, )) #定义连接池
channel = connection.channel()
channel.queue_declare(queue='test') #声明队列以向其发送消息消息
channel.basic_publish(exchange='', routing_key='test', body='Hello World!') #注意当未定义 exchange 时,routing_key 需和 queue 的值保持一致
print('send success msg to rabbitmq')
connection.close() #关闭连接
- 收消息,client
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1', port=5672))
channel = connection.channel()channel.queue_declare(queue='test')
def callback(ch, method, properties, body):
'''回调函数, 处理从 rabbitmq 中取出的消息'''
print(" [x] Received %r" % body)channel.basic_consume(callback,queue='test',no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #开始监听 接受消息
执行效果:
#product 端: send success msg to rabbitmq#client 端:
[*] Waiting for messages. To exit press CTRL+C
[x] Received b'Hello World!'
- 消息确认
当客户端从队列中取出消息之后,可能需要一段时间才能处理完成,如果在这个过程中,客户端出错了,异常退出了,而数据还没有处理完成,那么非常不幸,这段数据就丢失了,因为 rabbitmq 默认会把此消息标记为已完成,然后从队列中移除,
消息确认是客户端从 rabbitmq 中取出消息,并处理完成之后,会发送一个 ack 告诉 rabbitmq,消息处理完成,当 rabbitmq 收到客户端的获取消息请求之后,或标记为处理中,当再次收到 ack 之后,才会标记为已完成,然后从队列中删除。当 rabbitmq 检测到客户端和自己断开链接之后,还没收到 ack,则会重新将消息放回消息队列,交给下一个客户端处理,保证消息不丢失,也就是说,RabbitMQ 给了客户端足够长的时间来做数据处理。
在客户端使用 no_ack 来标记是否需要发送 ack,默认是 False,开启状态
product 向 rabbitmq 发送两条消息:
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1', port=5672, )) #定义连接池
channel = connection.channel() #声明队列以向其发送消息消息
channel.queue_declare(queue='test')
channel.basic_publish(exchange='', routing_key='test', body='1')
channel.basic_publish(exchange='', routing_key='test', body='2')
channel.basic_publish(exchange='', routing_key='test', body='3')
print('send success msg to rabbitmq')
connection.close() #关闭连接
客户端接受消息,不发送 ack
import pika import timeconnection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1', port=5672))
channel = connection.channel()channel.queue_declare(queue='test')
def callback(ch, method, properties, body):
'''回调函数, 处理从 rabbitmq 中取出的消息'''
print(" [x] Received %r" % body)
time.sleep(5)
#ch.basic_ack(delivery_tag = method.delivery_tag) #发送 ack 消息channel.basic_consume(callback,queue='test',no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #开始监听 接受消息
执行结果,发现消息并没有从队列中删除
第一次执行: [*] Waiting for messages. To exit press CTRL+C [x] Received b'1' [x] Received b'2' [x] Received b'3' 第二次执行: [*] Waiting for messages. To exit press CTRL+C [x] Received b'1' [x] Received b'2' [x] Received b'3'
加入 ack 之后:
import pika import timeconnection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1', port=5672))
channel = connection.channel()channel.queue_declare(queue='test')
def callback(ch, method, properties, body):
'''回调函数, 处理从 rabbitmq 中取出的消息'''
print(" [x] Received %r" % body)
time.sleep(5)
ch.basic_ack(delivery_tag = method.delivery_tag) #发送 ack 消息channel.basic_consume(callback,queue='test',no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #开始监听 接受消息
运行结果:发现第二次运行队列中已经没有消息
第一次: [*] Waiting for messages. To exit press CTRL+C [x] Received b'1' [x] Received b'2' [x] Received b'3 第二次: [*] Waiting for messages. To exit press CTRL+C
- 改变消息获取顺序
默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者 1 去队列中获取 奇数 序列的任务,消费者 1 去队列中获取 偶数 序列的任务。
channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列
默认情况:使用 product 往队列中放 10 个数字
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1', port=5672, )) #定义连接池
channel = connection.channel() #声明队列以向其发送消息消息
channel.queue_declare(queue='test')
for i in range(10):
channel.basic_publish(exchange='', routing_key='test', body=str(i))
print('send success msg[%s] to rabbitmq' %i)
connection.close() #关闭连接
运行结果;
send success msg[1] to rabbitmq send success msg[2] to rabbitmq send success msg[3] to rabbitmq send success msg[4] to rabbitmq send success msg[5] to rabbitmq send success msg[6] to rabbitmq send success msg[7] to rabbitmq send success msg[8] to rabbitmq send success msg[9] to rabbitmq
客户端 1 收消息:
import pika import timeconnection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1', port=5672))
channel = connection.channel()channel.queue_declare(queue='test')
def callback(ch, method, properties, body):
'''回调函数, 处理从 rabbitmq 中取出的消息'''
print(" [x] Received %r" % body)
#time.sleep(5)
ch.basic_ack(delivery_tag = method.delivery_tag) #发送 ack 消息channel.basic_consume(callback,queue='test',no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #开始监听 接受消息
运行结果:
[*] Waiting for messages. To exit press CTRL+C [x] Received b'0' [x] Received b'2' [x] Received b'4' [x] Received b'6' [x] Received b'8'
客户端 2 收消息:和 client1 的区别是加了一个 sleep(1)
import pika import timeconnection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1', port=5672))
channel = connection.channel()channel.queue_declare(queue='test')
def callback(ch, method, properties, body):
'''回调函数, 处理从 rabbitmq 中取出的消息'''
print(" [x] Received %r" % body)
time.sleep(1)
ch.basic_ack(delivery_tag = method.delivery_tag) #发送 ack 消息
channel.basic_consume(callback,queue='test',no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #开始监听 接受消息
执行结果:
[*] Waiting for messages. To exit press CTRL+C [x] Received b'1' [x] Received b'3' [x] Received b'5' [x] Received b'7' [x] Received b'9'
在两个客户端里加入 channel.basic_qos(prefetch_count=1) 参数
客户端 1:
import pika import timeconnection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1', port=5672))
channel = connection.channel()channel.queue_declare(queue='test')
def callback(ch, method, properties, body):
'''回调函数, 处理从 rabbitmq 中取出的消息'''
print(" [x] Received %r" % body)
##time.sleep(1)
ch.basic_ack(delivery_tag = method.delivery_tag) #发送 ack 消息
channel.basic_qos(prefetch_count=1) #添加不按顺序分配消息的参数
channel.basic_consume(callback,queue='test',no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #开始监听 接受消息
执行效果:
[*] Waiting for messages. To exit press CTRL+C [x] Received b'0' [x] Received b'2' [x] Received b'3' [x] Received b'4' [x] Received b'5' [x] Received b'6' [x] Received b'7' [x] Received b'8' [x] Received b'9'
客户端 2:
import pika import timeconnection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1', port=5672))
channel = connection.channel()channel.queue_declare(queue='test')
def callback(ch, method, properties, body):
'''回调函数, 处理从 rabbitmq 中取出的消息'''
print(" [x] Received %r" % body)
time.sleep(1)
ch.basic_ack(delivery_tag = method.delivery_tag) #发送 ack 消息channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue='test',no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #开始监听 接受消息
执行结果:
[*] Waiting for messages. To exit press CTRL+C [x] Received b'1'
发现,加入 channel.basic_qos(prefetch_count=1) 参数之后,客户端 2 由于 sleep 了 1s,所以只拿到了一个消息,其他的消息都被 client1 拿到了
- 消息持久化 消息确认机制使得客户端在崩溃的时候,服务端消息不丢失,但是如果 rabbitmq 奔溃了呢?该如何保证队列中的消息不丢失? 此就需要 product 在往队列中 push 消息的时候,告诉 rabbitmq,此队列中的消息需要持久化,用到的参数:durable=True,再次强调,Producer 和 client 都应该去创建这个 queue,尽管只有一个地方的创建是真正起作用的:
channel.basic_publish(exchange='', routing_key="test", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
具体代码:
product 端:
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1', port=5672, )) #定义连接池
channel = connection.channel() #声明队列以向其发送消息消息
channel.queue_declare(queue='test_persistent',durable=True)
for i in range(10):
channel.basic_publish(exchange='', routing_key='test_persistent', body=str(i),properties=pika.BasicProperties(delivery_mode=2))
print('send success msg[%s] to rabbitmq' %i)
connection.close() #关闭连接
client 端:
import pika import timeconnection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1', port=5672))
channel = connection.channel()channel.queue_declare(queue='test_persistent',durable=True)
def callback(ch, method, properties, body):
'''回调函数, 处理从 rabbitmq 中取出的消息'''
print(" [x] Received %r" % body)
#time.sleep(5)
ch.basic_ack(delivery_tag = method.delivery_tag) #发送 ack 消息channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue='test_persistent',no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #开始监听 接受消息
注意:client 端也需配置 durable=True,否则将报下面错误:pika.exceptions.ChannelClosed: (406, "PRECONDITION_FAILED - parameters for queue 'test_persistent' in vhost '/' not equivalent")
配置完之后,发现 product 往 rabbitmq 端 push 消息之后,重启 rabbitmq,消息依然存在
[root@dns ~]# rabbitmqctl list_queues Listing queues ... abc 0 abcd 0 hello2 300 test 0 test1 20 test_persistent 10 ...done. [root@dns ~]# /etc/init.d/rabbitmq-server restart Restarting rabbitmq-server: SUCCESS rabbitmq-server. [root@dns ~]# rabbitmqctl list_queues Listing queues ... abc 0 abcd 0 hello2 300 test1 20 test_persistent 10 ...done.
参考文档:参考文档:http://www.rabbitmq.com/tutorials/tutorial-two-python.html
2. 使用 Exchanges:
exchanges 主要负责从 product 那里接受 push 的消息,根据 product 定义的规则,投递到 queue 中,是 product 和 queue 的中间件
-
Exchange 类型
- direct 关键字类型
- topic 模糊匹配类型
- fanout 广播类型
-
使用 fanout 实现发布订阅者模型

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ 实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中
订阅者:
import pika import timeconnection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1', port=5672))
channel = connection.channel()channel.exchange_declare(exchange='test123',type='fanout') #定义一个 exchange , 类型为 fanout
rest = channel.queue_declare(exclusive=True) #创建一个随机队列, 并启用 exchange
queue_name = rest.method.queue #获取队列名
channel.queue_bind(exchange='test123',queue=queue_name) #将随机队列名和 exchange 进行绑定def callback(ch, method, properties, body):
'''回调函数, 处理从 rabbitmq 中取出的消息'''
print(" [x] Received %r" % body)
time.sleep(1)
ch.basic_ack(delivery_tag = method.delivery_tag) #发送 ack 消息channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue=queue_name,no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #开始监听 接受消息
发布者:
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1', port=5672, )) #定义连接池
channel = connection.channel() #声明队列以向其发送消息消息
channel.exchange_declare(exchange='test123',type='fanout')
for i in range(10):
channel.basic_publish(exchange='test123', routing_key='', body=str(i),properties=pika.BasicProperties(delivery_mode=2))
print('send success msg[%s] to rabbitmq' %i)
connection.close() #关闭连接
注意:
需先定义订阅者,启动订阅者,否则发布者 publish 到一个不存在的 exchange 是被禁止的。如果没有 queue bindings exchange 的话,msg 是被丢弃的。
- 使用 direct 实现根据关键字发布消息

消息发布订阅者模型是发布者发布一条消息,所有订阅者都可以收到,现在 rabbitmq 还支持根据关键字发送,在发送消息的时候使用 routing_key 参数指定关键字,rabbitmq 的 exchange 会判断 routing_key 的值,然后只将消息转发至匹配的队列,注意,此时需要订阅者先创建队列
配置参数为 exchange 的 type=direct,然后定义 routing_key 即可
订阅者 1: 订阅 error,warning,info
import pika import timeconnection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1', port=5672))
channel = connection.channel()channel.exchange_declare(exchange='test321',type='direct') #定义一个 exchange , 类型为 fanout
rest = channel.queue_declare(exclusive=True) #创建一个随机队列, 并启用 exchange
queue_name = rest.method.queue #获取队列名
severities = ['error','warning','info'] #定义三个 routing_keyfor severity in severities:
channel.queue_bind(exchange='test321', routing_key=severity,queue=queue_name)def callback(ch, method, properties, body):
'''回调函数, 处理从 rabbitmq 中取出的消息'''
print(" [x] Received %r" % body)
time.sleep(1)
ch.basic_ack(delivery_tag = method.delivery_tag) #发送 ack 消息channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue=queue_name,no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #开始监听 接受消息
订阅者 2:订阅 error,warning
import pika import timeconnection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1', port=5672))
channel = connection.channel()channel.exchange_declare(exchange='test321',type='direct') #定义一个 exchange , 类型为 fanout
rest = channel.queue_declare(exclusive=True) #创建一个随机队列, 并启用 exchange
queue_name = rest.method.queue #获取队列名
severities = ['error','warning'] #定义两个 routing_keyfor severity in severities:
channel.queue_bind(exchange='test321', routing_key=severity,queue=queue_name)def callback(ch, method, properties, body):
'''回调函数, 处理从 rabbitmq 中取出的消息'''
print(" [x] Received %r" % body)
time.sleep(1)
ch.basic_ack(delivery_tag = method.delivery_tag) #发送 ack 消息channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue=queue_name,no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #开始监听 接受消息
发布者:
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1', port=5672, )) #定义连接池
channel = connection.channel() #声明队列以向其发送消息消息
channel.exchange_declare(exchange='test321',type='direct')
channel.basic_publish(exchange='test321', routing_key='info', body='info msg',properties=pika.BasicProperties(delivery_mode=2)) #发送 info msg 到 info routing_key
channel.basic_publish(exchange='test321', routing_key='error', body='error msg',properties=pika.BasicProperties(delivery_mode=2)) #发送 error msg 到 error routing_keyprint('send success msg[] to rabbitmq')
connection.close() #关闭连接 **
效果:发现订阅者 1 和订阅者 2 都收到 error 消息,但是只有订阅者 1 收到了 info 消息
订阅者 1: [*] Waiting for messages. To exit press CTRL+C [x] Received b'info msg' [x] Received b'error msg' 订阅者 2: [*] Waiting for messages. To exit press CTRL+C [x] Received b'error msg'
- 使用 topic 实现模糊匹配发布消息
direct 实现了根据自定义的 routing_key 来标示不同的 queue,使用 topic 可以让队列绑定几个模糊的关键字,之后发送者将数据发送到 exchange,exchange 将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列
# 表示可以匹配 0 个 或 多个 单词
* 表示只能匹配 一个 单词
如:
fuzj.test 和 fuzj.test.test
fuzj.# 会匹配到 fuzj.test 和 fuzj.test.test
fuzj.* 只会匹配到 fuzj.test
订阅者 1: 使用 #匹配
import pika import timeconnection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1', port=5672))
channel = connection.channel()channel.exchange_declare(exchange='test333',type='topic') #定义一个 exchange , 类型为 fanout
rest = channel.queue_declare(exclusive=True) #创建一个随机队列, 并启用 exchange
queue_name = rest.method.queue #获取队列名
channel.queue_bind(exchange='test333', routing_key='test.#',queue=queue_name)def callback(ch, method, properties, body):
'''回调函数, 处理从 rabbitmq 中取出的消息'''
print(" [x] Received %r" % body)
time.sleep(1)
ch.basic_ack(delivery_tag = method.delivery_tag) #发送 ack 消息channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue=queue_name,no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #开始监听 接受消息
订阅者 2:使用*匹配
import pika import timeconnection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1', port=5672))
channel = connection.channel()channel.exchange_declare(exchange='test333',type='topic') #定义一个 exchange , 类型为 fanout
rest = channel.queue_declare(exclusive=True) #创建一个随机队列, 并启用 exchange
queue_name = rest.method.queue #获取队列名
channel.queue_bind(exchange='test333', routing_key='test.*',queue=queue_name)def callback(ch, method, properties, body):
'''回调函数, 处理从 rabbitmq 中取出的消息'''
print(" [x] Received %r" % body)
time.sleep(1)
ch.basic_ack(delivery_tag = method.delivery_tag) #发送 ack 消息channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue=queue_name,no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #开始监听 接受消息
发布者:
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1', port=5672, )) #定义连接池
channel = connection.channel() #声明队列以向其发送消息消息
channel.exchange_declare(exchange='test333',type='topic')
channel.basic_publish(exchange='test333', routing_key='test.123', body='test.123 msg',properties=pika.BasicProperties(delivery_mode=2))
channel.basic_publish(exchange='test333', routing_key='test.123.321', body=' test.123.321 msg',properties=pika.BasicProperties(delivery_mode=2))print('send success msg[] to rabbitmq')
connection.close() #关闭连接
输出效果:
订阅者 1: [*] Waiting for messages. To exit press CTRL+C [x] Received b'test.123 msg' [x] Received b' test.123.321 msg'订阅者 2:
[*] Waiting for messages. To exit press CTRL+C
[x] Received b'test.123 msg'
-
实现 RPC

-
过程:
-
客户端 Client 设置消息的 routing key 为 Service 的队列 op_q,设置消息的 reply-to 属性为返回的 response 的目标队列 reponse_q,设置其 correlation_id 为以随机 UUID,然后将消息发到 exchange。比如
channel.basic_publish(exchange='', routing_key='op_q', properties=pika.BasicProperties(reply_to = reponse_q, correlation_id = self.corr_id),body=request)
-
Exchange 将消息转发到 Service 的 op_q
-
Service 收到该消息后进行处理,然后将 response 发到 exchange,并设置消息的 routing_key 为原消息的 reply_to 属性,以及设置其 correlation_id 为原消息的 correlation_id 。
ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = props.correlation_id), body=str(response)) -
Exchange 将消息转发到 reponse_q
-
Client 逐一接受 response_q 中的消息,检查消息的 correlation_id 是否为等于它发出的消息的 correlation_id,是的话表明该消息为它需要的 response。
-
-
代码实现:
- 服务端:
import pika import subprocess connection = pika.BlockingConnection(pika.ConnectionParameters( host='127.0.0.1', port=5672, )) #定义连接池 channel = connection.channel() #创建通道 channel.queue_declare(queue='rpc_queue') #创建 rpc_queue 队列def operating(arg):
p = subprocess.Popen(arg, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) #执行系统命令
res = p.stdout.read() #取出标准输出
if not res: #判断是否有执行结果
responses_msg = p.stderr.read() #没有执行结果则取出标准错误输出
else:
responses_msg = res
return responses_msgdef on_request(ch, method, props, body):
command = str(body,encoding='utf-8')
print(" [.] start Processing command : %s" % command)
response_msg = operating(body) #调用函数执行命令
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = props.correlation_id),body=str(response_msg))
ch.basic_ack(delivery_tag = method.delivery_tag)channel.basic_qos(prefetch_count=1) #消息不平均分配, 谁取谁得
channel.basic_consume(on_request, queue='rpc_queue') #监听队列print(" [x] Awaiting RPC requests")
channel.start_consuming()
- 客户端
import pika import uuid import timeclass FibonacciRpcClient(object):
def init(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1',port=5672,)) #定义连接池
self.channel = self.connection.channel() #创建通道
result = self.channel.queue_declare(exclusive=True,auto_delete=True) #创建客户端短接受服务端回应消息的队列,\exclusive=True 表示只队列只允许当前链接进行连接,auto_delete=True 表示自动删除
self.callback_queue = result.method.queue #获取队列名称
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue) #从队列中获取消息<span style="color: rgba(0, 0, 255, 1)">def</span><span style="color: rgba(0, 0, 0, 1)"> on_response(self, ch, method, props, body): </span><span style="color: rgba(0, 0, 255, 1)">if</span> self.corr_id == props.correlation_id: <span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">判断</span> self.response =<span style="color: rgba(0, 0, 0, 1)"> body </span><span style="color: rgba(0, 0, 255, 1)">def</span><span style="color: rgba(0, 0, 0, 1)"> call(self, n): self.response </span>=<span style="color: rgba(0, 0, 0, 1)"> None self.corr_id </span>=<span style="color: rgba(0, 0, 0, 1)"> str(uuid.uuid4()) self.channel.basic_publish(exchange</span>=<span style="color: rgba(128, 0, 0, 1)">''</span><span style="color: rgba(0, 0, 0, 1)">, routing_key</span>=<span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(128, 0, 0, 1)">rpc_queue</span><span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(0, 0, 0, 1)">, properties</span>=<span style="color: rgba(0, 0, 0, 1)">pika.BasicProperties( reply_to </span>= self.callback_queue, <span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">回应消息的队列</span> correlation_id = self.corr_id, <span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">correlation id可以理解为请求的唯一标识码</span>
),
body=str(n))
while self.response is None: #不断从自己监听的队列里取消息, 直到取到消息
self.connection.process_data_events()
return self.response.decode()fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting server" )
time.sleep(0.1)
while True:
command = input('>> ')
response = fibonacci_rpc.call(command)
print(" [.] Get %r \n" % response)