Source code for driver.DriverClass

"""
Generic driver class for WM-Bus USB-dongle IM871A 
*************************************************
:Platform: Python 3.5.10 on Linux
:Synopsis: This module implements a class for communication with IM871A module.
:Authors: Steffen Breinbjerg, Thomas Serup
:Date: 21 October 2020


Version history
===============

- Ver 1.0: Set up driver.
- Ver 1.1: Implemented seperate 'open pipe' handler. Added pipe path as 2.nd argument.
- Ver 1.2: Implemented CRC-16 check.
- Ver 1.3: Logging exceptions to syslog instead of printing to console.
- Ver 1.4: No longer takes USB port as argument. Function for handling port is located in 'utils/Search_for_dongle'.  



Link Modes
===============
IM871A is able to run in different modes. Default mode is S2.


+-----------------------+----------+-------------------------------------------------------+
| Mode                  | Argument | Description                                           |
+=======================+==========+=======================================================+
| S1                    | s1       | Stationary, one way communication                     |
+-----------------------+----------+-------------------------------------------------------+
| S1-m                  | s1m      | S1 with shorter header                                |
+-----------------------+----------+-------------------------------------------------------+
| S2                    | s2       | Stationary, bidirectional communication               | 
+-----------------------+----------+-------------------------------------------------------+
| T1                    | t1       | Frequent transmit, one way communication              | 
+-----------------------+----------+-------------------------------------------------------+
| T2                    | t2       | Frequent transmit, bidirectional communication        |  
+-----------------------+----------+-------------------------------------------------------+
| C1, Telegram Format A | c1a      | Compact, one way communication. No fixed length       | 
+-----------------------+----------+-------------------------------------------------------+
| C1, Telegram Format B | c1b      | Compact, one way communication. Fixed length          | 
+-----------------------+----------+-------------------------------------------------------+
| C2, Telegram Format A | c2a      | Compact, bidirectional communication. No fixed length | 
+-----------------------+----------+-------------------------------------------------------+
| C2, Telegram Format B | c2b      | Compact, bidirectional communication. Fixed length    | 
+-----------------------+----------+-------------------------------------------------------+


"""
import serial as port   # type: ignore
import sys
import struct
import os
import subprocess
import errno
from binascii import hexlify
from struct import pack
from utils.log import log_info, log_error
from typing import Union
from utils.Search_for_dongle import im871a_port



# Definitions imported from WMBus_HCI_Spec_V1_6.pdf
IM871A_SERIAL_SOF = 0xA5
DEVMGMT_ID = 0x01
TEMP_MEM = 0x00
DEVMGMT_MSG_PING_REQ = 0x01
DEVMGMT_MSG_PING_RSP = 0x02
DEVMGMT_MSG_SET_CONFIG_REQ = 0x03
DEVMGMT_MSG_SET_CONFIG_RSP = 0x04
DEVMGMT_MSG_RESET_REQ = 0x07
DEVMGMT_MSG_RESET_RSP = 0x08


[docs]class IM871A: """ Implementation of a driver class for IM871A USB-dongle. Takes 1 argument1: - The path to where to put the pipe, e.g. the program directory. """ def __init__(self, program_path, logOnDestruct=True): try: self.Port = im871a_port() # Path to the USB-port used except Exception as err: log_error(err) exit(1) self.pipe = program_path + '/IM871A_pipe' # Pipe name and place to put it self.__init_open(self.Port) # Initially creates and opens port self.__create_pipe(self.Port) # Initially creates 'named pipe' file self.fp = None # Pointer to pipe self.logOnDestruct = logOnDestruct def __create_pipe(self, pipe: str) -> bool: """ Creates named pipe for output when class is instantiated, if no pipe exists. Pipe is named after which USB-port is used. """ FIFO = self.pipe try: os.mkfifo(FIFO) return True except OSError as err: # If error is 'File exists' don't show error if err.errno != errno.EEXIST: log_error(err) return False def __init_open(self, Port: str) -> bool: """ Initially creates and open serial communication with USB-dongle. Takes the port path as input. This function is only run once when class is instantiated. If port is closed after instantiation, use open() function to reopen port. """ try: self.IM871 = port.Serial(port=Port, baudrate=57600, bytesize=8, parity=port.PARITY_NONE, stopbits=1, timeout=0) return True except (ValueError, port.SerialException) as err: log_error(err) return False def is_open(self): try: # Will return true if object exists and is opened. try_val = self.IM871.isOpen() return try_val except AttributeError as err: log_error(err) # Will return False because object doesn't exist. return False def __string_to_hex(self, argument: str) -> Union[int, bytes]: """ Convert 'mode' argument into bytes. Returns '0xa' if no valid input. Function is used in 'setup_linkmode()'. """ switcher = { 's1' : 0x0, 's1m': 0x1, 's2' : 0x2, 't1' : 0x3, 't2' : 0x4, 'c1a': 0x6, 'c1b': 0x7, 'c2a': 0x8, 'c2b': 0x9 } return switcher.get(argument, 0xa) def __CRC16_check(self, message: bytes) -> bool: """ Argument must be the entire message from IM871-A as byte string Function returns TRUE if the check sum matches the expected CRC16 value """ Checksum = message[-4:] # Store the expected CRC16 value data = message[2:-4] # Removes SOF field and CRC16 value hex_radix = 16 g = 0x8408 # Generator polynomial, g(x) crc = 0xFFFF # Init value for CCITT CRC16 for byte in range(0, len(data), 2): # Loop over all bytes in message b = int(data[byte:byte + 2], hex_radix) # Make byte value from hex digits for _ in range(0, 8): # Repeat for 8 bits in a byte if (b & 1) ^ (crc & 1): # Is there a remainder for division by the poly for this bit? crc = (crc >> 1) ^ g # Get remainder from division else: crc >>= 1 # Just advance to next bit in division b >>= 1 # Move on to next bit in this byte of the message crc = crc ^ 0xFFFF # Perform final complement crc16 = hexlify(pack('<H', crc)) # CRC16 as little-endian if Checksum == crc16: # Check if sum matches expected CRC16 value return True else: return False def __pipe_data(self, data) -> bool: """ Open the pipe, try to send data to pipe and close the pipe again. Returns a bool to verify if data is sent to pipe. """ try: self.fp.write(data + os.linesep) self.fp.flush() return True except Exception as err: log_error(err) exit() return False
[docs] def open_pipe(self) -> bool: """ Open up the pipe. Blocks until pipe is opened at the other end. """ try: self.fp = open(self.pipe, "w") return True except IOError as err: log_error(err) return False
[docs] def read_data(self) -> bool: """ Read single dataframe from meters sending with the specified link mode. Send data into 'named pipe' (USBx_pipe). Removes the WM-Bus frame before sending data to pipe. """ while True: try: data = self.IM871.read(100) except (AttributeError, port.SerialException) as err: log_error(err) return False if len(data) != 0: if self.__CRC16_check(hexlify(data)): data_conv = data.hex() # Output to named pipe if self.__pipe_data(data_conv[6::]): return True else: return False
[docs] def ping(self) -> bool: """ Ping the WM-Bus module to check if it's alive. """ try: self.IM871.write(port.to_bytes([IM871A_SERIAL_SOF, DEVMGMT_ID, DEVMGMT_MSG_PING_REQ, 0x0])) except (AttributeError, port.SerialTimeoutException) as err: log_error(err) return False # Looking for response message from IM871A for _ in range(0, 500): try: data = self.IM871.read(10) except port.SerialException as err: log_error(err) return False data_conv = data.hex() # Looking for Endpoint-ID and Msg-ID in response if(data_conv[3:6] == "102"): return True # If no response message arrives return False
[docs] def reset_module(self) -> bool: """ Reset the WM-Bus module. The reset will be performed after approx. 500ms. """ try: self.IM871.write([IM871A_SERIAL_SOF, DEVMGMT_ID, DEVMGMT_MSG_RESET_REQ, 0x00]) except (AttributeError, port.SerialTimeoutException) as err: log_error(err) return False # Looking for response message from IM871A for _ in range(0, 500): try: data = self.IM871.read(10) except port.SerialException as err: log_error(err) return False data_conv = data.hex() # Looking for Endpoint-ID and Msg-ID in response if(data_conv[3:6] == "108"): return True # If no response message arrives return False
[docs] def setup_linkmode(self, mode: str) -> bool: """ Setup link mode for communication with meter. Takes the link mode as argument. If no Link Mode is set, default is 'S2' """ # Converting mode-string to byte Mode = self.__string_to_hex(mode) if(Mode == 0xa): return False try: self.IM871.write(port.to_bytes([IM871A_SERIAL_SOF, DEVMGMT_ID, DEVMGMT_MSG_SET_CONFIG_REQ, 0x03, TEMP_MEM, 0x2, Mode])) except (AttributeError, port.SerialTimeoutException) as err: log_error(err) return False # Looking for responce message from IM871A for _ in range(0, 500): try: data = self.IM871.read(10) except port.SerialException as err: log_error(err) return False data_conv = data.hex() # Looking for Endpoint-ID and Msg-ID in response if(data_conv[3:6] == "104"): return True # If no responce message arrives return False
[docs] def open(self) -> bool: """ Opens the port if port has been closed. It opens with the path given when instantiating the class. Also open the pipe. """ # Re-open port to IM871A try: self.IM871.open() # Re-open pipe self.open_pipe() return True except (AttributeError, port.SerialException) as err: log_error(err) return False
[docs] def close(self): """ Close the connection to IM871A, and the pipe. """ if not self.fp is None: self.fp.close() # only run if not NoneType self.IM871.close()
def __del__(self): """ Destructor for closing when going out of scope. Calls close() for closing port and pipe. """ if self.logOnDestruct: log_info("IM871A-Driver stopped!") self.close()