In this article, we will learn about “How to publish and subscribe to an EMQX-MQTT broker” using python. Before getting into the topic let us freshen up about MQTT protocol. MQTT is a lightweight Internet of Things (IoT) messaging protocol based on publish/subscribe model. In which the broker(EMQX-MQTT) acts as a middleman between the publisher(Who publish the message) and the subscriber(Who subscribe to the message), relaying messages from one to the other.
However, this article primarily explains “How to use the paho-mqtt client in the Python project” and also “How to implement connection, subscribe, messaging, and other functions between the client and EMQX-MQTT broker”.
The use of EMQX Broker and Python Paho MQTT client
Creating cloud EMQX-MQTT broker
The free version of EMQX-broker will be used in this article. This service is built on the MQTT IoT cloud platform. The broker’s access information is as follows:
- Firstly, we have to create an account in EMQX-broker
2. After creating an account in EMQX-broker. Secondly, we have to create a “New deployment”.
4. Click “+ Create deployment” to create a cloud server. After entering required values, finally click “Deploy” to create your deployment.
5. At last, your successfully created deployment will appear as below.
6. Don’t forget to note down your address and port details of your deployment.
7. Click the “EMQ X Dashboard” to create a “Username and Password” for your connection.
Once the “EMQ X Dashboard” tab opens click “Users & ACL” to add your username and password to your server. Finally, click “Add” to save the entered details.
Python code for publish and subscribe to EMQX-MQTT broker
Import the Paho MQTT client
The Paho Python Client provides a client class with support for both MQTT v3.1 and v3.1.1 on Python 2.7 or 3.x. It also provides some helper functions to make publishing one off messages to an MQTT server very straightforward.
from paho.mqtt import client as mqtt_client
Set the EMQX-MQTT Broker connection parameter
First, set the MQTT Broker connection’s address, port, and topic. Simultaneously, we use the Python function "random.randint
” to generate a random MQTT client id.
#Connect address of your deployment
BROKER = 'p70e2b8e.en.emqx.cloud
#Connect ports of your deployment
PORT = 12482
TOPIC = "SUBCRIBE AND PUBLISH TO AN EMQX-MQTT BROKER"
# generate client ID with pub prefix randomly
CLIENT_ID = "python-mqtt-tcp-sub-{id}".format(id=random.randint(0, 1000))
USERNAME = 'IOTEDU'
PASSWORD = 'public'
Write the EMQX-MQTT connect function
In the following code, we use “on_connect “as connect call back function. This function will be called after the client has been connected, and we can check if the client has been successfully connected using “rc"
in this function. Typically, we’ll build a MQTT client at the same time and connect it to “p70e2b8e.en.emqx.cloud”(broker).
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
client.subscribe(TOPIC)
else:
print("Failed to connect, return code {rc}".format(rc=rc), )”
def connect_mqtt():
client = mqtt_client.Client(CLIENT_ID)
client.username_pw_set(USERNAME, PASSWORD)
client.on_connect = on_connect
client.connect(BROKER, PORT)
return client
Write the EMQX-MQTT Publish message function
Firstly, we start by creating a while loop. We’ll use the MQTT client “publish
” function in this loop to send messages to the “SUBSCRIBE AND PUBLISH TO AN EMQX-MQTTBROKER” topic every second.
def publish(client):
msg_count = 0
while True:
msg_dict = {
'msg': msg_count
}
msg = json.dumps(msg_dict)
result = client.publish(TOPIC, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print("Send `{msg}` to topic `{topic}`".format(msg=msg, topic=TOPIC))
else:
print("Failed to send message to topic {topic}".format(topic=TOPIC))
msg_count += 1
time.sleep(1)
Write the EMQX-MQTT Subscribe to messages function
Secondly, we created the “on_message” callback function. After the client has received messages from the MQTT Broker, this function will be called. We’ll use this function to print the names of subscribed topics as well as the messages we’ve received.
def on_message(client, userdata, msg):
print("Received `{payload}` from `{topic}` topic".format(
payload=msg.payload.decode(), topic=msg.topic))
def connect_mqtt():
client = mqtt_client.Client(CLIENT_ID)
client.username_pw_set(USERNAME, PASSWORD)
client.on_connect = on_connect
client.on_message = on_message
client.connect(BROKER, PORT)
return client
The complete code for publishing and subscribing in python
Code for publish messages in python
import json
import random
import time
from paho.mqtt import client as mqtt_client
BROKER = ' p70e2b8e.en.emqx.cloud'
PORT = 12482
TOPIC = " SUBCRIBE AND PUBLISH TO AN EMQX-MQTT BROKER "
# generate client ID with pub prefix randomly
CLIENT_ID = "python-mqtt-tcp-pub-{id}".format(id=random.randint(0, 1000))
USERNAME = 'IOTEDU'
PASSWORD = 'public'
FLAG_CONNECTED = 0
def on_connect(client, userdata, flags, rc):
global FLAG_CONNECTED
if rc == 0:
FLAG_CONNECTED = 1
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code {rc}".format(rc=rc), )
def connect_mqtt():
client = mqtt_client.Client(CLIENT_ID)
client.username_pw_set(USERNAME, PASSWORD)
client.on_connect = on_connect
client.connect(BROKER, PORT)
return client
def publish(client):
msg_count = 0
while True:
msg_dict = {
'msg': msg_count
}
msg = json.dumps(msg_dict)
result = client.publish(TOPIC, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print("Send `{msg}` to topic `{topic}`".format(msg=msg, topic=TOPIC))
else:
print("Failed to send message to topic {topic}".format(topic=TOPIC))
msg_count += 1
time.sleep(1)
def run():
client = connect_mqtt()
client.loop_start()
time.sleep(1)
if FLAG_CONNECTED:
publish(client)
else:
client.loop_stop()
if __name__ == '__main__':
run()
Code for subscribe to messages in python
import random
from paho.mqtt import client as mqtt_client
broker = 'p70e2b8e.en.emqx.cloud'
port = 12482
topic = " SUBCRIBE AND PUBLISH TO AN EMQX-MQTT BROKER "
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 100)}'
# username = 'IOTEDU'
# password = 'public'
def connect_mqtt() -> mqtt_client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
# client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.subscribe(topic)
client.on_message = on_message
def run():
client = connect_mqtt()
subscribe(client)
client.loop_forever()
if __name__ == '__main__':
run()
Testing code for publishing and subscribing in Pycharm
Output of Published messages in Pycharm
Compile the code of publishing messages in pycharm, we will see that the client connects and publishes messages successfully.
Output of Subscribing to messages in Pycharm
Compile the code of subscribing messages in pycharm, we will see that the client connects successfully and receives the published messages successfully
However, we can see the connected clients in EMQX Dashboard.
Summary of our article
Finally, we’ve completed the use of the paho-mqtt client to connect to the free public MQTT broker, as well as implemented the connect, publish messages and subscribe to messages between the test client and the MQTT broker.