网站首页 > 基础教程 正文
目录
1, JMS消息协议规范
2, Python集成ActiveMQ
3, 封装服务mq_service.py
4, 接收处理消息mq_listener.py
5, 启动消息监听服务mq.py
6, 单元测试test_mq_serivce.py
7, 发送消息功能调用
8, 常见问题和解决方法
ActiveMQ是一个非常流行的消息队列服务中间件,实现JMS规范,基于STOMP协议(端口为61613)支持Python访问。
STOMP:Simple(or Streaming) Text Orientated Messaging Protocol,简单(流)文本定向消息协议
JMS:Java Message Service
一, JMS消息协议规范
JMS规范定义了2类消息发送接收模型:点对点queue,发布订阅topic,区别是能够重复消费和是否保存。
1, 点对点queue:不可重复消费,消息被消费前一直保存。
生产者发送消息到queue,一个消费者取出并消费消息。
消息被消费后,queue中不再保存,所以只有一个消费者能够取到消息。
queue支持多个消费者存在,但是一个消息只有一个消费者可以消费。
当前没有消费者时,消息一直保存,直到被消费者消费。
2, 发布订阅topic:可重复消费,发布给所有订阅者。
生产者发布消息到topic中,多个订阅者收到并消费消息。
和queue不同,发布到topic中的消息会被所有订阅者消费。
当生产者发布消息时,不管是否有订阅者,都不保存消息。
JMS规范定义的2类消息传输模型queue和topic比较:
Python集成ActiveMQ使用stomp.py,只需简单配置,本文在Django框架下进一步封装服务mq_service.py。典型系统架构示意图和消息队列:
时序图如下:
示例代码:hello_activemq
├── settings.py
├── mq
│ └── mq_service.py
│ └── mq_listener.py
├── tests
│ └── test_mq_service.py
├── management
│ └── commands
│ └── mq.py
二, Python集成ActiveMQ
1. 新建Django项目
运行命令:django-admin startproject hello_activemq
2. 进到目录hello_activemq,增加应用
运行命令:python manage.py startapp app
项目文件结构如下:
3. 安装stomp.py
pip install stomp.py >= 5.0.1
三, 封装服务mq_service.py,调用ActiveMQ发送消息
1. 增加mq_service.py
2. 打开settings.py,配置ActiveMQ信息
3. 为了增加代码的兼容和容错能力,封装get_conn(), close_conn()等辅助函数,详见代码文件mq_service.py。
四, 接收处理消息mq_listener.py
1. 增加mq_listener.py,声明消息处理类,继承stomp.ConnectionListener:
2. 在on_message()函数中,将消息字符串解析为JSON,方便业务处理。
3. 声明on_error()函数处理错误信息。
五, 启动消息监听服务mq.py
1. 将循环接收消息代码封装成函数consume_msg(),增加在服务中mq_serivce.py:
2. 调用set_listener()设置消息接收类实例,使用之前创建的MqListener
3. 调用subscribe()订阅消息,启动循环监听。
4. 将启动服务代码封装成command,在目录management/commands中增加mq.py
5. 运行命令python manage.py mq,看到消息提示,启动监听服务成功。
六, 单元测试test_mq_service.py
增加测试函数,发送消息。
运行python manage.py test,同时看到监听服务收到并处理消息。
七, 发送消息功能调用
1. 在views.py中发送消息,调用mq_servcie.py
2. 在urls.py中配置路由
3. 运行命令启动服务
python manage.py runserver 0.0.0.0:8001
4. REST接口发送消息
八, 常见问题和解决方法
1,启动服务错误:[transport.py: 787, attempt_connection] Could not connect to host 127.0.0.1, port 61613
解决:检查ActiveMQ是否正常启动,特别注意是否开启STOMP协议端口61613
原因:Python连接ActiveMQ使用STOMP协议,端口默认61613
2,发送消息时错误:TypeError: message should be a string or bytes, found <class 'dict'>
解决:将消息内容序列化为JSON,发送时调用json.dumps(),接收时调用json.loads()
原因:Python连接ActiveMQ使用的是STOMP协议,消息格式为简单文本。
注:JMS规范定义的5类消息:
字符串TextMessage,
键值对MapMessage,
序列化对象ObjectMessage
字节流BytesMessage
数据流StreamMessage
ActiveMQ支持5类JMS消息,增加了二进制大文件消息BlobMessage:
3,跨系统对接时接收到的消息类型不是TextMessage
Python开发的业务处理服务 -> Java开发的API服务,接收到的消息类型为BytesMessage,Python发送时设置conn.send('xx', msg_str, content_type="text/plain")仍然接收不到期望的类型TextMessage
解决:stomp建立连接时配置参数
conn = stomp.Connection10([("localhost", 61613)], auto_content_length=False)
原因:Python连接ActiveMQ使用STOMP协议,消息格式为简单文本,不携带类型信息,只通过header中的content-length来判断TextMessage和BytesMessage,所以发送消息时不在header中添加content-length就可以了。
猜你喜欢
- 2024-10-31 如何用 GitHub Actions 写出高质量的 Python代码?
- 2024-10-31 python 魔法方法连载三 __setattr()__
- 2024-10-31 CentOS 7下编译安装Python3 centos7安装python3.7
- 2024-10-31 3分钟掌握Python 中的集合 python中集合的概念
- 2024-10-31 Python3 集合 python 集合 discard
- 2024-10-31 群晖安装python3 群晖安装python3.7
- 2024-10-31 Python的设计还是很精妙的,三分钟理解__get__和__set__
- 2024-10-31 十六、Python集合set常用方法 python set集合和list集合的区别
- 2024-10-31 python数据类型-集合set python set集合取值
- 2024-10-31 python笔记18:set 集合 python set集合取值
- 最近发表
- 标签列表
-
- jsp (69)
- gitpush (78)
- gitreset (66)
- python字典 (67)
- dockercp (63)
- gitclone命令 (63)
- dockersave (62)
- linux命令大全 (65)
- pythonif (86)
- location.href (69)
- dockerexec (65)
- tail-f (79)
- queryselectorall (63)
- location.search (79)
- bootstrap教程 (74)
- 单例 (62)
- linuxgzip (68)
- 字符串连接 (73)
- html标签 (69)
- c++初始化列表 (64)
- mysqlinnodbmyisam区别 (63)
- arraylistadd (66)
- mysqldatesub函数 (63)
- window10java环境变量设置 (66)
- c++虚函数和纯虚函数的区别 (66)