本文介绍RabbitMQ在python下的基本使用
1. RabbitMQ安装,安装RabbitMQ需要预安装erlang语言,Windows直接下载双击安装即可
2. RabbitMQ的基本使用
发送消息端
# -*- coding:utf-8 -*-# Author:Wong Duimport pika# 生成RabbitMQ连接对象connection = pika.BlockingConnection( pika.ConnectionParameters("localhost"))# 建立连接管道channel = connection.channel()# 进行队列声明,指定消息要发送到的队列# durable声明队列持久性,即RabbitMQ重启队列仍旧不丢失(不完全可靠)# 对现有队列进行durable声明更改无法生效,只有重新声明一个新的队列才生效channel.queue_declare(queue="hello4", durable=True)# 发送消息到RabbitMQ消息队列里channel.basic_publish( exchange='', # 指定交换器 routing_key="hello4", # 要绑定的队列 body="Nice to meet you22223333...", # 要发送的消息 properties=pika.BasicProperties( delivery_mode=2, # 通过delivery_mode=2将队列内的消息持久化 ))print("[x] sent 'Nice to meet you22233332...'")connection.close()
接收消息端
# -*- coding:utf-8 -*-# Author:Wong Duimport pikaimport time# 生成RabbitMQ连接对象connection = pika.BlockingConnection( pika.ConnectionParameters("localhost"))# 建立连接管道channel = connection.channel()# 声明队列名channel.queue_declare(queue="hello4", durable=True)# 定义构建回调函数def callback(ch, method, properties, body): print(ch) # time.sleep(20) print("[x] received %r" % body) # ch.basic_ack(delivery_tag= method.delivery_tag) # 接收端回复消息给rabbixmq,代表该消息处理已完成 # 默认情况下,RabbitMQ循环把消息发送给consumer,# 通过basic_qos(prefetch_count=1)设置,可以在处理并确认完前一个消息之前,不再接收新信息# 即实现“能者多劳”的效果channel.basic_qos(prefetch_count=1) channel.basic_consume( callback, queue="hello4", no_ack=True, # 如果为True,rabbitmq会在consumer接收到数据时就删除队列中的消息)print("[*] waiting for messages. To exit press CTRL+C...")channel.start_consuming()
-----------------------拓展一吓,RabbitMQ消息转发类型:分类筛选,广播,正则匹配------------------------------
3. 分类筛选:RabbitMQ通过消息分类进行对应发送和接收
发送消息端
1 # -*- coding:utf-8 -*- 2 # Author:Wong Du 3 4 import pika 5 import sys 6 7 # 生成RabbitMQ连接对象 8 connection = pika.BlockingConnection( 9 pika.ConnectionParameters('localhost')10 )11 12 # 建立连接管道13 channel = connection.channel()14 15 # 声明交换器和交换器转发消息类型;'direct'为分类筛选 型16 channel.exchange_declare(exchange='_direct',17 exchange_type='direct')18 19 # 获取脚本命令:根据脚本命令,定义对应严重级别的消息(默认为info类消息)20 severity = sys.argv[1] if len(sys.argv) > 1 else 'info'21 # 获取要发送给RabbitMQ的消息(默认为Hello world!)22 msg = "".join(sys.argv[2:]) or "Hello world!"23 '''例:python direct分类筛选_producer.py info 无关痛痒的消息24 表示给RabbitMQ发送一条info级别的消息,消息内容:无关痛痒的消息25 '''26 27 channel.basic_publish(exchange='_direct', # 指定交换器28 routing_key=severity, # 指定要筛选的字段29 body=msg # 要发送给RabbitMQ的消息30 )31 32 print("----exec done----")33 connection.close()
接收消息端
1 # -*- coding:utf-8 -*- 2 # Author:Wong Du 3 4 import pika 5 import sys 6 7 # 生成RabbitMQ连接对象 8 connection = pika.BlockingConnection( 9 pika.ConnectionParameters('localhost')10 )11 12 # 建立连接管道13 channel = connection.channel()14 15 # 声明交换器和交换器消息转发类型;'direct'为分类筛选 型16 channel.exchange_declare(exchange='_direct',17 exchange_type='direct',)18 19 # 声明随机独家管道,用以接收RabbitMQ的消息20 Random_Queue = channel.queue_declare(exclusive=True)21 # 获取管道名22 queue_name = Random_Queue.method.queue23 24 # 获取脚本命令;获取相应严重级别的消息,可多个严重级别25 severities = sys.argv[1:]26 if not severities:27 sys.stderr.write("\033[31;1mUsage: %s [info] [warning] [error]\033[0m" % sys.argv)28 exit()29 30 for severity in severities:31 # 将交换器、队列和筛选分类绑定起来32 # 可以简单理解为:从交换器中通过rongting_key筛选消息后发送给queue33 channel.queue_bind(exchange='_direct',34 queue=queue_name,35 routing_key=severity)36 37 # 定义消息回调函数38 def callback(ch, method, properties, body):39 print(body)40 41 channel.basic_consume(42 callback, # 获取消息后交给回调函数进行处理43 queue=queue_name, # 从指定queue中获取消息44 no_ack=True # 如果为True,rabbitmq会在consumer接收到数据时就删除队列中的消息45 )46 47 print("[*] waiting for messages. To exit press CTRL+C...")48 # 运行管道,一直接收消息,如无消息则为阻塞状态49 channel.start_consuming()
4. 广播:RabbitMQ通过广播发送消息给多个consumer,广播模式下发送消息是实时的,没有即时接收,则丢失
发送消息端
1 # -*- coding:utf-8 -*- 2 # Author:Wong Du 3 4 ''' 5 RabbitMQ交换器广播模式, 6 当producer发送消息给RabbitMQ时, 7 RabbitMQ会立即转发给所有当前在线的consumer,然后将消息删除 8 如当前没有在线的consumer,消息则会丢失 9 即RabbitMQ下交换器广播模式的消息是实时的,当时发送,当时接收10 '''11 12 import pika13 14 # 生成RabbitMQ连接对象15 connection = pika.BlockingConnection(16 pika.ConnectionParameters("localhost")17 )18 19 # 建立连接管道20 channel = connection.channel()21 22 # 声明交换器及交换器消息转发类型;'fanout'为广播类型23 channel.exchange_declare(exchange="dudu",24 # type="fanout",25 exchange_type="fanout",)26 27 msg = "Hello world! Nice to meet you..."28 channel.basic_publish(exchange="dudu", # 指定交换器29 routing_key='', # routing_key必须配置30 body=msg) # 要发送给RabbitMQ的消息31 32 print("---exec done---")33 connection.close()
接收消息端
1 # -*- coding:utf-8 -*- 2 # Author:Wong Du 3 4 ''' 5 RabbitMQ交换器广播模式, 6 当producer发送消息给RabbitMQ时, 7 RabbitMQ会立即转发给所有当前在线的consumer,然后将消息删除 8 如当前没有在线的consumer,消息则会丢失 9 即RabbitMQ下交换器广播模式的消息是实时的,当时发送,当时接收10 '''11 12 import pika13 14 # 生成RabbitMQ连接对象15 connection = pika.BlockingConnection(16 pika.ConnectionParameters("localhost")17 )18 19 # 建立连接管道20 channel = connection.channel()21 22 # 声明交换器和交换器消息转发类型;'fanout'为广播类型23 channel.exchange_declare(exchange="dudu",24 exchange_type="fanout",)25 26 # 声明随机独家队列,用以接收RabbitMQ发来的消息27 Random_Queue = channel.queue_declare(exclusive=True)28 # 获取队列名29 queue_name = Random_Queue.method.queue30 31 # 将交换器和队列绑定在一起32 channel.queue_bind(exchange="dudu",33 queue=queue_name,)34 35 # 定义消息处理回调函数36 def callback(ch, method, properties, body):37 print("[x] received %r" % body)38 39 channel.basic_consume(40 callback, # 接收到消息后通过回调函数处理41 queue=queue_name, # 配置获取消息的队列42 # no_ack=True, # consumer接收到消息后RabbitMQ即删除掉该消息43 )44 45 print("[*] waiting for messages. To exit press CTRL+C...")46 # 运行管道,一直接收消息,如无消息则为阻塞状态47 channel.start_consuming()
5. 正则匹配:RabbitMQ通过关键字标记发送消息,通过匹配关键字接收消息
发送消息端
1 # -*- coding:utf-8 -*- 2 # Author:Wong Du 3 4 ''' 5 RabbitMQ交换器的topic正则匹配模式, 6 可理解为direct分类筛选的升级版, 7 既能通过关键字标记消息内容,也能通过正则匹配标记关键字 8 ''' 9 10 import pika11 import sys12 13 # 生成RabbitMQ连接对象14 connection = pika.BlockingConnection(15 pika.ConnectionParameters('localhost')16 )17 18 # 建立连接管道19 channel = connection.channel()20 21 # 声明交换器和交换器消息转发类型;'topic'为正则匹配类型22 channel.exchange_declare(exchange='_topic',23 exchange_type='topic')24 25 # 获取脚本命令;word用来标记msg,msg为消息内容本身26 word = sys.argv[1] if len(sys.argv) > 1 else 'any.info'27 msg = " ".join(sys.argv[2:]) or "Hello world!"28 29 30 channel.basic_publish(31 exchange='_topic', # 指定交换器32 routing_key=word, # 配置标记关键字33 body=msg # 要发送给RabbitMQ的消息内容34 )35 36 print("----exec done----")37 connection.close()
接收消息端
1 # -*- coding:utf-8 -*- 2 # Author:Wong Du 3 4 ''' 5 RabbitMQ交换器的topic正则匹配模式, 6 可理解为direct分类筛选的升级版, 7 既能通过关键字标记消息内容,也能通过正则匹配标记关键字 8 ''' 9 10 import pika11 import sys12 13 # 生成RabbitMQ连接对象14 connection = pika.BlockingConnection(15 pika.ConnectionParameters('localhost')16 )17 18 # 建立连接管道19 chanel = connection.channel()20 21 # 声明交换器和交换器消息转发类型;'topic'为匹配类型22 chanel.exchange_declare(exchange='_topic',23 exchange_type='topic')24 25 # 声明随机独家队列,用以接收RabbitMQ发来的消息26 Random_Queue = chanel.queue_declare(exclusive=True)27 # 获取随机队列队列名28 queue_name = Random_Queue.method.queue29 30 # 获取脚本命令;即匹配规则31 severities = sys.argv[1:]32 if not severities:33 print("Usage: %s [*.info] [mysql.*] [#]" % sys.argv)34 exit(1)35 36 for severity in severities:37 # 对交换器、队列、匹配规则进行绑定38 chanel.queue_bind(exchange='_topic',39 queue=queue_name,40 routing_key=severity,)41 42 # 构建消息处理回调函数43 def callback(ch, method, properties, body):44 print(body)45 46 chanel.basic_consume(47 callback,48 queue=queue_name,49 no_ack=True,50 )51 52 print("[*] waiting for messages. To exit press CTRL+C...")53 # 运行管道,一直接收消息,如无消息则为阻塞状态54 chanel.start_consuming()