Source code for mqtt.MqttClient

"""
MQTT-class for communication between Gateway and ReCalc/Cloud at ReMoni
***********************************************************************

:Platform: Python 3.5.10 on Linux
:Synopsis: This module implements a class for MQTT Client
:Authors: Steffen Breinbjerg, Janus Bo Andersen
:Latest update: 18 November 2020
:Version: 1.2

* **Ver. 1.0**: Setup MQTT class with loaded settings.
* **Ver. 1.1**: Implement support functions for return codes and better printout for on_connect.
* **Ver. 1.2**: Implemented TLS and better handling of reason codes.

"""

import paho.mqtt.client as mqtt
from utils.load_settings import load_settings
from typing import Tuple
import ssl


[docs]class MqttClient: """ This class wraps a Paho MQTT client and sets it up using a profile from settings/secrets.yaml. On instantiation, all setup steps run up to and including connect. """ # This method is the same for all instances of the class
[docs] @staticmethod def on_connect(client: 'mqtt.Client', userdata, flags, rc): """ on_connect callback rc argument value meaning: * 0: Connection successful * 1: Connection refused - incorrect protocol version * 2: Connection refused - invalid client identifier * 3: Connection refused - server unavailable * 4: Connection refused - bad username or password * 5: Connection refused - not authorised 6-255: Currently unused. """ # TODO: Implement logging potentially print("Connected " + client._client_id.decode() + " with result code " + str(rc) + ": " + connection_rc_str(rc))
# For outputting log messages to console @staticmethod def __on_log(client, userdata, level, buf): # TODO: Implement logging if needed pass # print("log: ", buf) # By default, do nothing on_message @staticmethod def __on_message_default(client, userdata, message): pass # To track QoS2 handshaking @staticmethod def __on_publish(client, userdata, mid): pass @staticmethod def on_disconnect(client, userdata, flads, rc=0): pass @staticmethod def on_subscribe(client, obj, mid, granted_qos): pass
[docs] def __init__(self, name, on_message, on_publish, param_settings='mqtt_local'): """Handles all setup and connection when object is initialized. :param str name: Client ID (must be unique) :param function_ptr on_message: On message callback function :param function_ptr on_publish: On publish callback function :param str param_settings: Profile from secrets.yaml to use """ settings = load_settings()[param_settings] self.client = mqtt.Client(client_id=name, transport=settings["transport"]) self.client.username_pw_set(settings["username"], str(settings["password"])) if settings["tls"]: self.client.tls_set(ca_certs=None, certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS) self.client.on_connect = MqttClient.on_connect self.client.on_message = on_message self.client.on_disconnect = MqttClient.on_disconnect self.client.on_subscribe = MqttClient.on_subscribe self.client.on_publish = on_publish # In production, let's consider disabling logging or routing to a file self.client.on_log = MqttClient.__on_log self.client.enable_logger() # This ensures, that there is some sort of goodbye on losing connection # self.client.will_set(name, will_message) # Connect immediately self.client.connect(settings["ip"], port=settings["port"])
[docs] def loop_start(self): """ Start loop in new thread. This thread will handle disconnects to MQTT broker. """ return self.client.loop_start()
[docs] def loop_stop(self): """ Stops the loop thread. """ return self.client.loop_stop()
# Note: Removed qos=2 - This may be added again.
[docs] def publish(self, topic, payload): """ :param str topic: Topic to publish to :param str payload: Message payload :returns: (result, mid) :rtype: tuple - result: 0 on success, 4 if no connection - mid is message id for tracked message """ return self.client.publish(topic, payload)
[docs] def subscribe(self, topic, qos): """ :param str topic: Topic to subscribe to. :param int qos: Quality of service level (0, 1, 2). :returns: (result, mid) :rtype: tuple - result: 0 on success, 4 if no connection - mid is message id for tracked message """ return self.client.subscribe(topic, qos)
[docs] def loop_forever(self): """ Will block the program, and only handle callback functions and disconnects. """ return self.client.loop_forever(retry_first_connection=False)
[docs] def disconnect(self): """ Gracefully disconnects from server. """ return self.client.disconnect()
def donothing_onmessage(client, userdata, message): pass def donothing_onpublish(client, userdata, mid): pass
[docs]def publish_rc_str(rc: Tuple) -> str: """ Returns text description of return codes for publishing a message. Based on return code spec. from Paho MQTT. Alternatively, use mqtt.error_string(). """ rc_str = { mqtt.MQTT_ERR_SUCCESS: "Successful", mqtt.MQTT_ERR_NO_CONN: "No connection" } try: reason = rc_str[rc[0]] except KeyError as e: reason = "unknown" return reason
[docs]def publish_rc_bool(rc: Tuple) -> bool: """ Returns True if message was sent correctly, otherwise False. Based on return code spec. from Paho MQTT. The rc tuple is (reason_code, message_id). Alternatively, use mqtt.error_string(). """ return rc[0] == mqtt.MQTT_ERR_SUCCESS
[docs]def connection_rc_str(rc: Tuple) -> str: """ Returns text description of return codes for a connection attempt to server. Based on return code spec. from Paho MQTT. Alternatively, use mqtt.connack_string(). """ rc_str = { mqtt.CONNACK_ACCEPTED: "Connection successful", mqtt.CONNACK_REFUSED_PROTOCOL_VERSION: "Connection refused - incorrect protocol version", mqtt.CONNACK_REFUSED_IDENTIFIER_REJECTED: "Connection refused - invalid client identifier", mqtt.CONNACK_REFUSED_SERVER_UNAVAILABLE: "Connection refused - server unavailable", mqtt.CONNACK_REFUSED_BAD_USERNAME_PASSWORD: "Connection refused - bad username or password", mqtt.CONNACK_REFUSED_NOT_AUTHORIZED: "Connection refused - not authorised" } return rc_str[rc]