博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ消息中介之Python使用
阅读量:5011 次
发布时间:2019-06-12

本文共 8227 字,大约阅读时间需要 27 分钟。

 

本文介绍RabbitMQ在python下的基本使用 

 

 

1. RabbitMQ安装,安装RabbitMQ需要预安装erlang语言,Windows直接下载双击安装即可

  

  

 

2. RabbitMQ的基本使用

  发送消息端

# -*- coding:utf-8 -*-# Author:Wong Duimport pika# 生成RabbitMQ连接对象connection = pika.BlockingConnection(    pika.ConnectionParameters("localhost"))# 建立连接管道channel = connection.channel()# 进行队列声明,指定消息要发送到的队列# durable声明队列持久性,即RabbitMQ重启队列仍旧不丢失(不完全可靠)# 对现有队列进行durable声明更改无法生效,只有重新声明一个新的队列才生效channel.queue_declare(queue="hello4", durable=True)# 发送消息到RabbitMQ消息队列里channel.basic_publish(    exchange='',    # 指定交换器    routing_key="hello4",   # 要绑定的队列    body="Nice to meet you22223333...",     # 要发送的消息    properties=pika.BasicProperties(        delivery_mode=2,        # 通过delivery_mode=2将队列内的消息持久化    ))print("[x] sent 'Nice to meet you22233332...'")connection.close()

  接收消息端

# -*- coding:utf-8 -*-# Author:Wong Duimport pikaimport time# 生成RabbitMQ连接对象connection = pika.BlockingConnection(    pika.ConnectionParameters("localhost"))# 建立连接管道channel = connection.channel()# 声明队列名channel.queue_declare(queue="hello4", durable=True)# 定义构建回调函数def callback(ch, method, properties, body):    print(ch)    # time.sleep(20)    print("[x] received %r" % body)    # ch.basic_ack(delivery_tag= method.delivery_tag)  # 接收端回复消息给rabbixmq,代表该消息处理已完成 # 默认情况下,RabbitMQ循环把消息发送给consumer,# 通过basic_qos(prefetch_count=1)设置,可以在处理并确认完前一个消息之前,不再接收新信息# 即实现“能者多劳”的效果channel.basic_qos(prefetch_count=1) channel.basic_consume(    callback,    queue="hello4",    no_ack=True,  # 如果为True,rabbitmq会在consumer接收到数据时就删除队列中的消息)print("[*] waiting for messages. To exit press CTRL+C...")channel.start_consuming()

 

-----------------------拓展一吓,RabbitMQ消息转发类型:分类筛选,广播,正则匹配------------------------------

3. 分类筛选:RabbitMQ通过消息分类进行对应发送和接收

  发送消息端

1 # -*- coding:utf-8 -*- 2 # Author:Wong Du 3  4 import pika 5 import sys 6  7 # 生成RabbitMQ连接对象 8 connection = pika.BlockingConnection( 9     pika.ConnectionParameters('localhost')10 )11 12 # 建立连接管道13 channel = connection.channel()14 15 # 声明交换器和交换器转发消息类型;'direct'为分类筛选 型16 channel.exchange_declare(exchange='_direct',17                          exchange_type='direct')18 19 # 获取脚本命令:根据脚本命令,定义对应严重级别的消息(默认为info类消息)20 severity = sys.argv[1] if len(sys.argv) > 1 else 'info'21 # 获取要发送给RabbitMQ的消息(默认为Hello world!)22 msg = "".join(sys.argv[2:]) or "Hello world!"23 '''例:python direct分类筛选_producer.py info 无关痛痒的消息24         表示给RabbitMQ发送一条info级别的消息,消息内容:无关痛痒的消息25 '''26 27 channel.basic_publish(exchange='_direct',   # 指定交换器28                       routing_key=severity, # 指定要筛选的字段29                       body=msg              # 要发送给RabbitMQ的消息30 )31 32 print("----exec done----")33 connection.close()
direct分类筛选_producer

  接收消息端

1 # -*- coding:utf-8 -*- 2 # Author:Wong Du 3  4 import pika 5 import sys 6  7 # 生成RabbitMQ连接对象 8 connection = pika.BlockingConnection( 9     pika.ConnectionParameters('localhost')10 )11 12 # 建立连接管道13 channel = connection.channel()14 15 # 声明交换器和交换器消息转发类型;'direct'为分类筛选 型16 channel.exchange_declare(exchange='_direct',17                          exchange_type='direct',)18 19 # 声明随机独家管道,用以接收RabbitMQ的消息20 Random_Queue = channel.queue_declare(exclusive=True)21 # 获取管道名22 queue_name = Random_Queue.method.queue23 24 # 获取脚本命令;获取相应严重级别的消息,可多个严重级别25 severities = sys.argv[1:]26 if not severities:27     sys.stderr.write("\033[31;1mUsage: %s [info] [warning] [error]\033[0m" % sys.argv)28     exit()29 30 for severity in severities:31     # 将交换器、队列和筛选分类绑定起来32     # 可以简单理解为:从交换器中通过rongting_key筛选消息后发送给queue33     channel.queue_bind(exchange='_direct',34                        queue=queue_name,35                        routing_key=severity)36 37 # 定义消息回调函数38 def callback(ch, method, properties, body):39     print(body)40 41 channel.basic_consume(42     callback,   # 获取消息后交给回调函数进行处理43     queue=queue_name,   # 从指定queue中获取消息44     no_ack=True         # 如果为True,rabbitmq会在consumer接收到数据时就删除队列中的消息45 )46 47 print("[*] waiting for messages. To exit press CTRL+C...")48 # 运行管道,一直接收消息,如无消息则为阻塞状态49 channel.start_consuming()
direct分类筛选_consumer

 

4. 广播:RabbitMQ通过广播发送消息给多个consumer,广播模式下发送消息是实时的,没有即时接收,则丢失

  发送消息端

1 # -*- coding:utf-8 -*- 2 # Author:Wong Du 3  4 ''' 5 RabbitMQ交换器广播模式, 6 当producer发送消息给RabbitMQ时, 7 RabbitMQ会立即转发给所有当前在线的consumer,然后将消息删除 8 如当前没有在线的consumer,消息则会丢失 9 即RabbitMQ下交换器广播模式的消息是实时的,当时发送,当时接收10 '''11 12 import pika13 14 # 生成RabbitMQ连接对象15 connection = pika.BlockingConnection(16     pika.ConnectionParameters("localhost")17 )18 19 # 建立连接管道20 channel = connection.channel()21 22 # 声明交换器及交换器消息转发类型;'fanout'为广播类型23 channel.exchange_declare(exchange="dudu",24                          # type="fanout",25                          exchange_type="fanout",)26 27 msg = "Hello world! Nice to meet you..."28 channel.basic_publish(exchange="dudu",  # 指定交换器29                       routing_key='',   # routing_key必须配置30                       body=msg)         # 要发送给RabbitMQ的消息31 32 print("---exec done---")33 connection.close()
fanout广播_producer

  接收消息端

1 # -*- coding:utf-8 -*- 2 # Author:Wong Du 3  4 ''' 5 RabbitMQ交换器广播模式, 6 当producer发送消息给RabbitMQ时, 7 RabbitMQ会立即转发给所有当前在线的consumer,然后将消息删除 8 如当前没有在线的consumer,消息则会丢失 9 即RabbitMQ下交换器广播模式的消息是实时的,当时发送,当时接收10 '''11 12 import pika13 14 # 生成RabbitMQ连接对象15 connection = pika.BlockingConnection(16     pika.ConnectionParameters("localhost")17 )18 19 # 建立连接管道20 channel = connection.channel()21 22 # 声明交换器和交换器消息转发类型;'fanout'为广播类型23 channel.exchange_declare(exchange="dudu",24                          exchange_type="fanout",)25 26 # 声明随机独家队列,用以接收RabbitMQ发来的消息27 Random_Queue = channel.queue_declare(exclusive=True)28 # 获取队列名29 queue_name = Random_Queue.method.queue30 31 # 将交换器和队列绑定在一起32 channel.queue_bind(exchange="dudu",33                    queue=queue_name,)34 35 # 定义消息处理回调函数36 def callback(ch, method, properties, body):37     print("[x] received %r" % body)38 39 channel.basic_consume(40     callback,   # 接收到消息后通过回调函数处理41     queue=queue_name,   # 配置获取消息的队列42     # no_ack=True,        # consumer接收到消息后RabbitMQ即删除掉该消息43 )44 45 print("[*] waiting for messages. To exit press CTRL+C...")46 # 运行管道,一直接收消息,如无消息则为阻塞状态47 channel.start_consuming()
fanout广播_consumer

 

5. 正则匹配:RabbitMQ通过关键字标记发送消息,通过匹配关键字接收消息

  发送消息端

1 # -*- coding:utf-8 -*- 2 # Author:Wong Du 3  4 ''' 5 RabbitMQ交换器的topic正则匹配模式, 6 可理解为direct分类筛选的升级版, 7 既能通过关键字标记消息内容,也能通过正则匹配标记关键字 8 ''' 9 10 import pika11 import sys12 13 # 生成RabbitMQ连接对象14 connection = pika.BlockingConnection(15     pika.ConnectionParameters('localhost')16 )17 18 # 建立连接管道19 channel = connection.channel()20 21 # 声明交换器和交换器消息转发类型;'topic'为正则匹配类型22 channel.exchange_declare(exchange='_topic',23                          exchange_type='topic')24 25 # 获取脚本命令;word用来标记msg,msg为消息内容本身26 word = sys.argv[1] if len(sys.argv) > 1 else 'any.info'27 msg = " ".join(sys.argv[2:]) or "Hello world!"28 29 30 channel.basic_publish(31     exchange='_topic',      # 指定交换器32     routing_key=word,       # 配置标记关键字33     body=msg                # 要发送给RabbitMQ的消息内容34 )35 36 print("----exec done----")37 connection.close()
topic匹配_producer

  接收消息端

1 # -*- coding:utf-8 -*- 2 # Author:Wong Du 3  4 ''' 5 RabbitMQ交换器的topic正则匹配模式, 6 可理解为direct分类筛选的升级版, 7 既能通过关键字标记消息内容,也能通过正则匹配标记关键字 8 ''' 9 10 import pika11 import sys12 13 # 生成RabbitMQ连接对象14 connection = pika.BlockingConnection(15     pika.ConnectionParameters('localhost')16 )17 18 # 建立连接管道19 chanel = connection.channel()20 21 # 声明交换器和交换器消息转发类型;'topic'为匹配类型22 chanel.exchange_declare(exchange='_topic',23                         exchange_type='topic')24 25 # 声明随机独家队列,用以接收RabbitMQ发来的消息26 Random_Queue = chanel.queue_declare(exclusive=True)27 # 获取随机队列队列名28 queue_name = Random_Queue.method.queue29 30 # 获取脚本命令;即匹配规则31 severities = sys.argv[1:]32 if not severities:33     print("Usage: %s [*.info] [mysql.*] [#]" % sys.argv)34     exit(1)35 36 for severity in severities:37     # 对交换器、队列、匹配规则进行绑定38     chanel.queue_bind(exchange='_topic',39                       queue=queue_name,40                       routing_key=severity,)41 42 # 构建消息处理回调函数43 def callback(ch, method, properties, body):44     print(body)45 46 chanel.basic_consume(47     callback,48     queue=queue_name,49     no_ack=True,50 )51 52 print("[*] waiting for messages. To exit press CTRL+C...")53 # 运行管道,一直接收消息,如无消息则为阻塞状态54 chanel.start_consuming()
topic匹配_consumer

 

转载于:https://www.cnblogs.com/Caiyundo/p/9554448.html

你可能感兴趣的文章
guid
查看>>
Python中出现“TabError: inconsistent use of tabs and spaces in indentation”问题的解决
查看>>
ajax请求
查看>>
js学习总结----DOM增删改和应用
查看>>
希尔伯特矩阵(Hilbert matrix)
查看>>
(20)sopel算法
查看>>
学习总结 javascript 闭包
查看>>
实验吧一个小坑注入
查看>>
【 D3.js 高级系列 — 8.0 】 打标
查看>>
Mac必备软件推荐
查看>>
Android Gson深入分析
查看>>
display:flow-root
查看>>
判读字符串是否为空的全局宏-分享
查看>>
iOS中Block的基础用法
查看>>
mac 终端 使用ftp命令
查看>>
22-reverseString-Leetcode
查看>>
Centos 开机自动联网
查看>>
cocos2dx使用lua和protobuf
查看>>
HDOJ 5630 Rikka with Chess
查看>>
netcore2.1 在后台运行一个任务
查看>>