Python实战:手把手教你用paho-mqtt编写第一个MQTT发布者

2026/03/25 MQTT 共 5001 字,约 15 分钟

Python实战:手把手教你用paho-mqtt编写第一个MQTT发布者

在万物互联的时代,设备间的轻量级、高效通信变得至关重要。MQTT(消息队列遥测传输)协议正是为此而生,它凭借其轻量、低功耗、低带宽占用和发布/订阅模式,成为了物联网(IoT)领域的首选通信协议。今天,我们就从最基础的部分开始,使用Python和paho-mqtt库,编写一个能够向世界“喊话”的MQTT发布者程序。

一、 环境准备与核心概念

在开始编码之前,我们需要确保环境就绪,并理解几个核心概念。

1.1 安装必备库

打开你的终端或命令提示符,使用pip安装paho-mqtt库。这是Eclipse基金会维护的官方MQTT Python客户端库,功能强大且稳定。

pip install paho-mqtt

1.2 理解MQTT的核心角色

  • 发布者(Publisher): 我们即将编写的程序角色。它负责产生并向特定“主题”发送消息。
  • 订阅者(Subscriber): 监听特定“主题”并接收该主题下消息的程序。
  • 代理(Broker): MQTT消息的中转站,负责接收发布者的消息,并将其转发给所有订阅了该主题的订阅者。你可以把它想象成邮局或消息交换机。
  • 主题(Topic): 一个层级化的字符串(如 home/living-room/temperature),用于标识消息的类型或目的地。订阅者通过订阅主题来接收感兴趣的消息。

我们的目标:编写一个发布者,连接到一个公共的MQTT代理,并向一个主题发布一条“Hello, MQTT!”消息。

1.3 选择一个MQTT代理

为了测试,我们可以使用免费的公共MQTT代理。这里我们选择 test.mosquitto.org(由Mosquitto项目提供)在端口1883上进行非加密连接。

注意:公共代理仅用于测试和学习,切勿用于生产环境或传输敏感信息。

二、 编写你的第一个发布者

现在,让我们创建一个名为 mqtt_publisher_simple.py 的Python文件。

2.1 导入库与定义参数

# mqtt_publisher_simple.py
import paho.mqtt.client as mqtt
import time

# MQTT代理服务器地址
BROKER = "test.mosquitto.org"
PORT = 1883
# 客户端ID,需要唯一。如果为None,库会自动生成一个。
CLIENT_ID = "MyFirstPublisher_Python"
# 要发布消息的主题
TOPIC = "python/mqtt/firsthello"
# 要发送的消息内容
MESSAGE = "Hello, MQTT World from Python!"
# 服务质量等级 (0: 最多一次, 1: 至少一次, 2: 仅一次)
QOS = 1

参数解释

  • QoS (服务质量):这是MQTT的一个重要特性。
    • 0: “最多一次”,消息可能丢失,但传输最快。
    • 1: “至少一次”,确保消息送达,但可能重复。
    • 2: “仅一次”,确保消息恰好送达一次,最可靠但开销最大。根据你的业务场景选择,我们这里用1

2.2 创建MQTT客户端并设置回调函数

paho-mqtt采用基于回调的异步模型。我们需要定义一些函数,在特定事件发生时被自动调用。

# 连接成功回调函数
def on_connect(client, userdata, flags, rc):
    """
    当客户端收到来自服务器的CONNACK响应(连接结果)时被调用。
    rc参数决定了连接是否成功。
    """
    if rc == 0:
        print(f"[连接成功] 已连接到代理: {BROKER}:{PORT}")
        # 连接成功后,可以在这里订阅主题(对于发布者不是必须的)
    else:
        print(f"[连接失败] 错误码: {rc}")

# 发布消息回调函数(当消息从客户端发出时触发)
def on_publish(client, userdata, mid):
    """
    当一条消息完成发送到代理后(对于QoS 0),或者代理确认收到后(对于QoS 1,2)被调用。
    mid是消息ID。
    """
    print(f"[发布确认] 消息ID {mid} 已发送。")

# 创建客户端实例
client = mqtt.Client(client_id=CLIENT_ID, protocol=mqtt.MQTTv311)

# 将回调函数赋值给客户端
client.on_connect = on_connect
client.on_publish = on_publish

2.3 建立连接、发布消息并断开

try:
    # 1. 连接到代理
    print(f"[正在连接] 尝试连接到 {BROKER}:{PORT} ...")
    client.connect(BROKER, PORT, keepalive=60) # keepalive: 心跳间隔,单位秒

    # 启动网络循环线程,在后台处理消息收发和心跳
    client.loop_start()

    # 等待连接建立(简单处理,生产环境应用更健壮的逻辑)
    time.sleep(2)

    # 2. 发布消息
    print(f"[准备发布] 主题: {TOPIC}, 消息: {MESSAGE}, QoS: {QOS}")
    # publish() 方法会返回一个消息ID (mid) 和结果码 (rc)
    result = client.publish(topic=TOPIC, payload=MESSAGE, qos=QOS, retain=False)
    
    # result是一个元组 (rc, mid)
    status = result[0]
    if status == 0:
        print(f"[发布成功] 消息已加入发送队列。")
    else:
        print(f"[发布失败] 状态码: {status}")

    # 等待一下,确保发布回调被触发
    time.sleep(2)

except Exception as e:
    print(f"[发生异常] {e}")
finally:
    # 3. 断开连接并清理
    print("[清理] 正在断开连接...")
    client.loop_stop() # 停止网络循环
    client.disconnect() # 发送DISCONNECT包
    print("[完成] 程序结束。")

代码解析

  1. client.connect():发起与MQTT代理的TCP连接。
  2. client.loop_start():这是关键一步。它启动一个后台线程,负责:
    • 维持与代理的心跳(keepalive)。
    • 发送网络数据(如我们的发布消息)。
    • 接收来自代理的数据(如连接确认、发布确认)。 如果不调用这个,连接将无法真正工作。
  3. client.publish():发布消息的核心方法。retain=False表示这不是一条保留消息(代理不会为新订阅者保存它)。
  4. client.loop_stop()client.disconnect():优雅地关闭连接。

三、 运行与测试

在终端运行你的脚本:

python mqtt_publisher_simple.py

如果一切顺利,你将看到类似以下的输出:

[正在连接] 尝试连接到 test.mosquitto.org:1883 ...
[连接成功] 已连接到代理: test.mosquitto.org:1883
[准备发布] 主题: python/mqtt/firsthello, 消息: Hello, MQTT World from Python!, QoS: 1
[发布成功] 消息已加入发送队列。
[发布确认] 消息ID 1 已发送。
[清理] 正在断开连接...
[完成] 程序结束。

如何验证消息真的发出去了?

你可以使用以下任何一种方式作为订阅者来验证:

  1. 使用命令行工具mosquitto_sub (如果你安装了Mosquitto):
    mosquitto_sub -h test.mosquitto.org -t "python/mqtt/firsthello" -v
    

    运行此命令后,再运行你的发布者脚本,你将在终端看到收到的消息。

  2. 使用在线MQTT客户端: 访问 HiveMQ Web ClientMQTTX Web,连接到 test.mosquitto.org:8080 (WebSocket端口),订阅主题 python/mqtt/firsthello,然后运行你的发布者,网页上就会实时显示消息。

四、 进阶:一个更实用的循环发布者

上面的例子是“一次性”的。一个典型的物联网传感器发布者可能需要定期发送数据。让我们改进一下,模拟一个温度传感器,每隔5秒发布一次数据。

创建新文件 mqtt_publisher_sensor.py

# mqtt_publisher_sensor.py
import paho.mqtt.client as mqtt
import time
import json
import random

BROKER = "test.mosquitto.org"
PORT = 1883
CLIENT_ID = "PythonTempSensor"
TOPIC = "home/livingroom/sensor/temperature"
QOS = 1

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print(f"传感器已上线,开始上报数据...")
    else:
        print(f"连接失败,代码: {rc}")

def on_publish(client, userdata, mid):
    # 可以在这里记录日志,这里简单打印
    # print(f"消息 {mid} 确认送达")
    pass

client = mqtt.Client(client_id=CLIENT_ID)
client.on_connect = on_connect
client.on_publish = on_publish

try:
    client.connect(BROKER, PORT, 60)
    client.loop_start()
    time.sleep(1) # 等待连接

    sensor_id = "sensor_001"
    print(f"模拟传感器 ID: {sensor_id}")
    print(f"发布主题: {TOPIC}")
    print("按 Ctrl+C 停止...\n")

    while True:
        # 模拟生成温度数据 (18.0 - 25.0 度之间)
        temperature = round(18.0 + random.random() * 7, 2)
        # 构造一个结构化的JSON消息,比纯文本更通用
        payload = {
            "sensor_id": sensor_id,
            "timestamp": int(time.time()),
            "value": temperature,
            "unit": "°C"
        }
        message = json.dumps(payload) # 将字典转换为JSON字符串

        print(f"[{time.strftime('%H:%M:%S')}] 发布: {message}")
        client.publish(TOPIC, message, qos=QOS)

        time.sleep(5) # 等待5秒

except KeyboardInterrupt:
    print("\n用户中断,正在关闭...")
except Exception as e:
    print(f"错误: {e}")
finally:
    client.loop_stop()
    client.disconnect()
    print("传感器已离线。")

这个进阶示例展示了:

  • 结构化数据:使用JSON格式,便于订阅者解析和处理。
  • 模拟真实场景:循环发布、包含设备ID、时间戳和单位。
  • 优雅退出:通过捕获 KeyboardInterrupt (Ctrl+C) 来安全关闭程序。

五、 总结与下一步

恭喜!你已经成功使用Python和paho-mqtt库编写了你的第一个MQTT发布者程序。我们涵盖了从环境搭建、核心概念理解、基础单次发布到模拟循环传感器发布的完整流程。

关键要点

  1. paho-mqtt 使用回调机制处理网络事件。
  2. 必须调用 loop_start()loop_forever() 来启动网络通信线程。
  3. QoS主题 的设计需要根据实际应用需求仔细考量。

下一步你可以探索

  • 编写订阅者:使用 client.on_message 回调来接收和处理消息。
  • 本地部署Broker:安装 MosquittoEMQX 到本地机器或服务器。
  • 安全连接:学习使用TLS/SSL加密连接以及用户名/密码认证。
  • 持久化与遗嘱消息:设置 clean_sessionLast Will and Testament,让设备异常离线时通知其他客户端。

希望这篇教程为你打开了MQTT世界的大门。现在,去让你的设备和应用开始对话吧!

文档信息

Search

    Table of Contents