使用python操作RabbitMQ

使用python操作RabbitMQ

黄鹏宇 496 2022-11-16

基本概述

一、使用 pika作为客户端

Pika is a pure-Python implementation of the AMQP 0-9-1 protocol that tries to stay fairly independent of the underlying network support library.

二、几个名词/配置

2.1 ACK机制

acknowledgment:消息确认

1、什么是消息确认ACK。

答:如果在处理消息的过程中,消费者的服务器在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失。为了确保数据不会丢失,RabbitMQ支持消息确定-ACK。

2、RabbitMQ的ACK的消息确认机制。

ACK机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,MQ收到反馈后才将此消息从队列中删除。消息的ACK确认机制默认是打开的。

如果一个消费者在处理消息出现了网络不稳、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列。
如果在集群的情况下,RabbitMQ会立即将这个消息推送给这个在线的其他消费者。这种机制保证了在消费者服务端故障的时候,不丢失任何消息和任务。
消息永远不会从RabbitMQ中删除,只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。

3、ACK机制的开发注意事项?

如果消费者发生异常,ack没法送消息应答。,Message会一直重新分发。然后RabbitMQ会占用越来越多的内容,由于RabbitMQ会长时间运行,因此这个"内存泄漏"是致命的。

4.怎么解决ack的内存泄漏问题?

(1)在程序处理中可以进行异常捕获,保证消费者的程序正常执行。

(2)使用RabbitMQ的ack的配置确认机制。(开启重试次数)

(3)手动设置消息应答。如果消费端异常,也返回应答成功,再把未消费成功的数据记录下来,进行补偿。

2.2 RabbitMQ的持久化

交换机持久化

将交换机设置成为持久化的交换机,是为了在RabbitMQ重启之后,交换机还存在,以保证可以和队列继续进行沟通。如果一个交换机被长期使用,建议将其设置为持久化的。

exchangeDeclare(NAME,TYPE,durable=True)

2.3队列持久化
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)

durable = true

2.4消息持久化

basic_publish(exchange,routing_key,body,properties=pika.BasicProperties(delivery_mode=2))
delivery_mode=2表示持久化

3. 队列的其他属性

channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)

  1. exclusive

是否为排他的。如果一个队列被声明为排他队列,该队列 仅对首次声明它的连接(Connection)可见,是该Connection私有的,类似于加锁,并在连接断开connection.close()时自动删除 ;

  1. autoDelete

是否自动删除 ;如果autoDelete = true,当所有消费者都与这个队列断开连接时,这个队列会自动删除。

  1. arguments

设置队列的其他一些参数,如 x-rnessage-ttl 、x-expires 、x-rnax-length 、x-rnax-length-bytes、 x-dead-letter-exchange、 x-deadletter-routing-key 、 x-rnax-priority 等。

  • x-max-length:
    消息条数限制,该参数是非负整数值。限制加入queue中消息的条数。先进先出原则,超过10条后面的消息会顶替前面的消息。

  • x-max-length-bytes
    消息容量限制,该参数是非负整数值。该参数和x-max-length目的一样限制队列的容量,但是这个是靠队列大小(bytes)来达到限制。

  • x-message-ttl
    消息存活时间,该参数是非负整数值.创建queue时设置该参数可指定消息在该queue中待多久,可根据x-dead-letter-routing-key和x-dead-letter-exchange生成可延迟的死信队列。

  • x-max-priority
    消息优先级,创建queue时arguments可以使用x-max-priority参数声明优先级队列 。该参数应该是一个整数,表示队列应该支持的最大优先级。建议使用1到10之间。目前使用更多的优先级将消耗更多的资源(Erlang进程)。
    设置该参数同时设置死信队列时或造成已过期的低优先级消息会在未过期的高优先级消息后面执行。该参数会造成额外的CPU消耗。

  • x-expires
    存活时间,创建queue时参数arguments设置了x-expires参数,该queue会在x-expires到期后queue消息,亲身测试直接消失(哪怕里面有未消费的消息)。

  • x-dead-letter-exchange和x-dead-letter-routing-key
    创建queue时参数arguments设置了x-dead-letter-routing-key和x-dead-letter-exchange,会在x-message-ttl时间到期后把消息放到x-dead-letter-routing-key和x-dead-letter-exchange指定的队列中达到延迟队列的目的。

4. 交换机的其他属性

exchangeDeclare(String exchange,
                            String type,
                            boolean durable,
                            boolean autoDelete,
                            boolean internal,
                            Map<String, Object> arguments) ;

Protocol Extensions

二、交换机模式

1. 广播模式 Fanout

image-1668582080246

1.1 特点

1) 可以有多个消费者
2) 每个消费者有自己的queue(队列)
3) 每个队列都要绑定到Exchange(交换机)
4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
5) 交换机把消息发送给绑定过的所有队列
6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

1.2 代码

【init.py】
import pika
import json

QUEUE_NAME_1 = "queue1"
QUEUE_NAME_2 = "queue2"
QUEUE_NAME_LIST = [QUEUE_NAME_1,QUEUE_NAME_2]

credentials = pika.PlainCredentials('admin', '123456') 

# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',port=5672,virtual_host='/test',credentials=credentials))

# 建⽴rabbit协议的通道
channel = connection.channel()

# 声明一个交换机,并为广播类型
channel.exchange_declare(exchange='exchange_1', exchange_type='fanout',durable=True)

# 声明队列,并绑定交换机
for queueName in QUEUE_NAME_LIST:
    result = channel.queue_declare(queue=queueName, exclusive=False)
    channel.queue_bind(exchange='exchange_1', queue=queueName)

# 关闭与rabbitmq server的连接
connection.close()

【producer.py】

import pika
import json

credentials = pika.PlainCredentials('admin', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',port=5672,virtual_host='/test',credentials=credentials))
channel = connection.channel()

for i in range(10):
    message=json.dumps({'exchange_1':"1000%s"%i})
    # 当交换机为fanout时,routing_key不生效
    channel.basic_publish(exchange='exchange_1',routing_key='',body=message)
    
# 关闭与rabbitmq server的连接
connection.close()
【consumer.py】
import pika
import json
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',port=5672,virtual_host='/test',credentials=credentials))
channel = connection.channel()

# 定义⼀个回调函数来处理消息队列中的消息,这⾥是打印出来
def callback(ch, method, properties, body):
    # ⼿动发送确认消息
    ch.basic_ack(delivery_tag=method.delivery_tag)
    msg = body.decode()
    print(f"CONSUMER_NAME:{CONSUMER_NAME}\nQUEUE_NAME:{QUEUE_NAME}\nmsg:{msg}")

#channel.basicQos(10); 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=QUEUE_NAME,on_message_callback=callback,auto_ack=False)
# auto_ack=True) # ⾃动发送确认消息
# 开始接收信息,并进⼊阻塞状态,队列⾥有信息才会调⽤callback进⾏处理
channel.start_consuming()

1.3 结果:

总共发送了10条消息,C1_1 + C1_2 == C2 == 10条
image-1668582059172

2. Direct

image-1668583520885

2.1 特点

  1. routing_key可以指定消息只发送到特定的队列中
  2. 一个队列可以绑定多个routing_key
    image-1668588442602

2.2 代码

【init.py】
import pika
import json

HOST = "localhost"
EXCHANGE_NAME = "exchange_direct"

# info
QUEUE_NAME_1 = "queue1"

# info,debug,error
QUEUE_NAME_2 = "queue2"

ROUTING_KEY_ERROR = "error"
ROUTING_KEY_DEBUG = "debug"
ROUTING_KEY_INFO = "info"

credentials = pika.PlainCredentials('admin', '123456') 

# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host=HOST,port=5672,virtual_host='/test',credentials=credentials))

# 建⽴rabbit协议的通道
channel = connection.channel()

# 声明一个交换机,并为DIRECT类型
channel.exchange_declare(exchange=EXCHANGE_NAME, exchange_type='direct',durable=True)

# 声明队列,并绑定交换机
result = channel.queue_declare(queue=QUEUE_NAME_1, exclusive=False)
channel.queue_bind(exchange=EXCHANGE_NAME, queue=QUEUE_NAME_1,routing_key=ROUTING_KEY_INFO)

result = channel.queue_declare(queue=QUEUE_NAME_2, exclusive=False)
channel.queue_bind(exchange=EXCHANGE_NAME, queue=QUEUE_NAME_2,routing_key=ROUTING_KEY_ERROR)
channel.queue_bind(exchange=EXCHANGE_NAME, queue=QUEUE_NAME_2,routing_key=ROUTING_KEY_DEBUG)
channel.queue_bind(exchange=EXCHANGE_NAME, queue=QUEUE_NAME_2,routing_key=ROUTING_KEY_INFO)


# 关闭与rabbitmq server的连接
connection.close()

【producer.py】
# DIRECT 交换机

import pika
import json

HOST = "localhost"

ROUTING_KEY_ERROR = "error"
ROUTING_KEY_DEBUG = "debug"
ROUTING_KEY_INFO = "info"

EXCHANGE_NAME = "exchange_direct"

credentials = pika.PlainCredentials('admin', '123456') # mq⽤户名和密码,没有则需要⾃⼰创建
# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host=HOST,port=5672,virtual_host='/test',credentials=credentials))
# 建⽴rabbit协议的通道
channel = connection.channel()

for i in range(10):
    message=json.dumps({EXCHANGE_NAME:"1000%s"%i})
    # 当交换机为fanout时,routing_key不生效
    channel.basic_publish(exchange=EXCHANGE_NAME,routing_key='',body=message)

for i in range(12):
    message = json.dumps({EXCHANGE_NAME:"1000%s"%i})
    if i % 3 ==0:
         # 确保消息是持久的,设置消息持久化,将要发送的消息的属性标记为2,表示该消息要持久化
        channel.basic_publish(exchange=EXCHANGE_NAME,routing_key=ROUTING_KEY_ERROR,body=message,properties=pika.BasicProperties(delivery_mode=2)) 
   
    if i % 3 ==1:
        channel.basic_publish(exchange=EXCHANGE_NAME,routing_key=ROUTING_KEY_DEBUG,body=message,properties=pika.BasicProperties(delivery_mode=2))

    if i % 3 ==2:
        channel.basic_publish(exchange=EXCHANGE_NAME,routing_key=ROUTING_KEY_INFO,body=message,properties=pika.BasicProperties(delivery_mode=2))

# 关闭与rabbitmq server的连接
connection.close()
【consumer1.py】

import pika

QUEUE_NAME = "queue1"
CONSUMER_NAME = "C1"

HOST = "localhost"

credentials = pika.PlainCredentials('admin', '123456')
# BlockingConnection:同步模式
connection = pika.BlockingConnection(pika.ConnectionParameters(host=HOST,port=5672,virtual_host='/test',credentials=credentials))
channel = connection.channel()

# 定义⼀个回调函数来处理消息队列中的消息,这⾥是打印出来
def callback(ch, method, properties, body):
    # ⼿动发送确认消息
    ch.basic_ack(delivery_tag=method.delivery_tag)
    msg = body.decode()
    print(f"CONSUMER_NAME:{CONSUMER_NAME}\nQUEUE_NAME:{QUEUE_NAME}\nmsg:{msg}\nproperties:{properties}\nmethod:{method}\nch:{ch}")
    print("========================")

#channel.basicQos(10); 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=QUEUE_NAME,on_message_callback=callback,auto_ack=False)

# auto_ack=True) # ⾃动发送确认消息
# 开始接收信息,并进⼊阻塞状态,队列⾥有信息才会调⽤callback进⾏处理
channel.start_consuming()
【consumer2.py】
import pika

QUEUE_NAME = "queue2"
CONSUMER_NAME = "C2"

HOST = "localhost"

credentials = pika.PlainCredentials('admin', '123456')
# BlockingConnection:同步模式
connection = pika.BlockingConnection(pika.ConnectionParameters(host=HOST,port=5672,virtual_host='/test',credentials=credentials))
channel = connection.channel()

# 定义⼀个回调函数来处理消息队列中的消息,这⾥是打印出来
def callback(ch, method, properties, body):
    # ⼿动发送确认消息
    ch.basic_ack(delivery_tag=method.delivery_tag)
    msg = body.decode()
    print(f"CONSUMER_NAME:{CONSUMER_NAME}\nQUEUE_NAME:{QUEUE_NAME}\nmsg:{msg}\nproperties:{properties}\nmethod:{method}\nch:{ch}")
    print("========================")
#channel.basicQos(10); 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=QUEUE_NAME,on_message_callback=callback,auto_ack=False)
# auto_ack=True) # ⾃动发送确认消息
# 开始接收信息,并进⼊阻塞状态,队列⾥有信息才会调⽤callback进⾏处理
channel.start_consuming()

2.3 结果

可以看到C2收到了所有消息,C1只收到了info的消息

3. Topic

image-1668583528357

3.1 代码

【init	.py】

import pika
import json

HOST = "localhost"
EXCHANGE_NAME = "exchange_topic_a4"

# info
QUEUE_NAME_1 = "topic_queue1"
QUEUE_NAME_2 = "topic_queue2"
QUEUE_NAME_3 = "topic_queue3"

ROUTING_KEY_UNIFORM = "a4.uniform_msg.#"
ROUTING_KEY_LOG = "a4.log.*"
ROUTING_KEY_PAYMENT = "a4.payment.*"

credentials = pika.PlainCredentials('admin', '123456') 

# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host=HOST,port=5672,virtual_host='/test',credentials=credentials))

# 建⽴rabbit协议的通道
channel = connection.channel()

# 声明一个交换机,并为DIRECT类型
channel.exchange_declare(exchange=EXCHANGE_NAME, exchange_type='topic',durable=True)

# 声明队列,并绑定交换机
result = channel.queue_declare(queue=QUEUE_NAME_1, exclusive=False)
channel.queue_bind(exchange=EXCHANGE_NAME, queue=QUEUE_NAME_1,routing_key=ROUTING_KEY_UNIFORM)

result = channel.queue_declare(queue=QUEUE_NAME_2, exclusive=False)
channel.queue_bind(exchange=EXCHANGE_NAME, queue=QUEUE_NAME_2,routing_key=ROUTING_KEY_LOG)


result = channel.queue_declare(queue=QUEUE_NAME_3, exclusive=False)
channel.queue_bind(exchange=EXCHANGE_NAME, queue=QUEUE_NAME_3,routing_key=ROUTING_KEY_PAYMENT)


# 关闭与rabbitmq server的连接
connection.close()

【producer.py】
# TOPIC 交换机

import pika
import json

HOST = "localhost"

ROUTING_KEY_UNIFORM_MORNING = "a4.uniform_msg.morning"
ROUTING_KEY_LOG_ERROR = "a4.log.error"
ROUTING_KEY_PAYMENT_SUCCESS = "a4.payment.success"
ROUTING_KEY_PAYMENT_REFUND = "a4.payment.refund"

EXCHANGE_NAME = "exchange_topic_a4"

credentials = pika.PlainCredentials('admin', '123456') # mq⽤户名和密码,没有则需要⾃⼰创建
# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host=HOST,port=5672,virtual_host='/test',credentials=credentials))
# 建⽴rabbit协议的通道
channel = connection.channel()

for i in range(12):
    if i % 4 ==0:
        message = json.dumps({"body":"发生了错误,请保存并发送邮件"},ensure_ascii=False)
        channel.basic_publish(exchange=EXCHANGE_NAME,routing_key=ROUTING_KEY_LOG_ERROR,body=message,properties=pika.BasicProperties(delivery_mode=2)) 
   
    if i % 4 ==1:
        message = json.dumps({"body":"发送统一模板消息"},ensure_ascii=False)
        channel.basic_publish(exchange=EXCHANGE_NAME,routing_key=ROUTING_KEY_UNIFORM_MORNING,body=message,properties=pika.BasicProperties(delivery_mode=2))

    if i % 4 ==2:
        message = json.dumps({"body":"有人付款成功,openid为xxx"},ensure_ascii=False)
        channel.basic_publish(exchange=EXCHANGE_NAME,routing_key=ROUTING_KEY_PAYMENT_SUCCESS,body=message,properties=pika.BasicProperties(delivery_mode=2))
    
    if i % 4 ==3:
        message = json.dumps({"body":"有人退款,openid为xxx,请处理"},ensure_ascii=False)
        channel.basic_publish(exchange=EXCHANGE_NAME,routing_key=ROUTING_KEY_PAYMENT_REFUND,body=message,properties=pika.BasicProperties(delivery_mode=2))


# 关闭与rabbitmq server的连接
connection.close()