"""
Parse Kamstrup OmniPower wm-bus telegrams
*****************************************
:platform: Python 3.5.10 on Linux, OS X
:synopsis: Implements parsing functionality for C1 telegrams and log handling for data series
:author: Janus Bo Andersen
:date: 28 October 2020
Version history
===============
- Ver 1.0: Set up parser and decryption. Janus.
- Ver 2.0: Implement CRC16, timezone. Janus.
- Ver 2.1: More robust exception handling, parse ELL-SN. Janus
- Ver 2.2: Utilize new MeterMeasurement.is_empty() in validation during parsing. Janus
Overview
========
- This module implements parsing for the Kamstrup OmniPower meter, single-phase.
- The meter sends wm-bus C1 (compact one-way) telegrams.
- Telegrams on wm-bus are little-endian, i.e. LSB first.
- The meter sends 1 long and 7 short telegrams, and then repeats.
- Long telegrams include data record headers (DRH) and data, that is DIF/VIF codes + data.
- Short telegrams only include data.
Telegram fields
===============
In a telegram C1 telegram, the data fields are:
+---+-------+-------+-------------+---------------------------------------------+---------------------------------+
| # | Byte# | Bytes | M-bus field | Description | Expected value (little-endian) |
+===+=======+=======+=============+=============================================+=================================+
| 0 | 0 | 1 | L | Telegram length | 0x27 (39 bytes, short frame), or|
| | | | | | 0x2D (45 bytes, long frame) |
+---+-------+-------+-------------+---------------------------------------------+---------------------------------+
| 1 | 1 | 1 | C | Control field (type and purpose of message) | 0x44 (SND_NR) |
+---+-------+-------+-------------+---------------------------------------------+---------------------------------+
| 2 | 2-3 | 2 | M | Manufacturer ID (official ID code) | 0x2D2C (KAM) |
+---+-------+-------+-------------+---------------------------------------------+---------------------------------+
| 3 | 4-7 | 4 | A | Address (meter serial number) | 0x57686632 (big-endian:32666857)|
+---+-------+-------+-------------+---------------------------------------------+---------------------------------+
| 4 | 8 | 1 | Ver. | Version number of the wm-bus firmware | 0x30 |
+---+-------+-------+-------------+---------------------------------------------+---------------------------------+
| 5 | 9 | 1 | Medium | Type / medium of meter | 0x02 (Electricity) |
+---+-------+-------+-------------+---------------------------------------------+---------------------------------+
| 6 | 10 | 1 | CI | Control Information | 0x8D (Extended Link Layer 2) |
+---+-------+-------+-------------+---------------------------------------------+---------------------------------+
| 7 | 11 | 1 | CC | Communication Control | 0x20 (Slow response sync.) |
+---+-------+-------+-------------+---------------------------------------------+---------------------------------+
| 8 | 12 | 1 | ACC | Access field | Varies |
+---+-------+-------+-------------+---------------------------------------------+---------------------------------+
| 9 | 13-16 | 4 | AES CTR / SN| AES counter (Session number) | Varies, see below |
+---+-------+-------+-------------+---------------------------------------------+---------------------------------+
|10 | 17-39 | 23 | Data | Contains AES-encrypted data frame, | Encrypted data |
| | 17-45 | 29 | | varying for short and long frames | |
+---+-------+-------+-------------+---------------------------------------------+---------------------------------+
|11 | | 2 | CRC16 | CRC16 check | |
+---+-------+-------+-------------+---------------------------------------------+---------------------------------+
The fields 0-9 of the telegram can be unpacked using the little-endian format `<BBHIBBBBB`, where
- `<` marks little-endian,
- `B` is an unsigned 1 byte (char),
- `H` is an unsigned 2 byte (short),
- `I` is an unsigned 4 byte (int)
AES counter / Session number
============================
The AES_CTR / Extended Link Layer SN field (ELL-SN) is is structured as per EN 13757-4, p. 54.
The example shows ELL-SN value of 0x01870320 (little-endian) -> 0x20038701 (big-endian). Bit readout:
+---------+-----------+-----------+-----------+-----------+
|Byte: | 3 | 2 | 1 | 0 |
+=========+===========+===========+===========+===========+
|Hex: | 0x20 | 0x03 | 0x87 | 0x01 |
+---------+-----------+-----------+-----------+-----------+
|Binary: | 0010 0000 | 0000 0011 | 1000 0111 | 0000 0001 |
+---------+-----------+-----------+-----------+-----------+
Should give the following slicing and interpretation:
+---------+-------+---------------------------------+---------+
|Bits: | 31-29 | 28-04 | 03-00 |
+=========+=======+=================================+=========+
|Field: | ENC | Time | Session |
+---------+-------+---------------------------------+---------+
|Example: | 001 | 0 0000 0000 0011 1000 0111 0000 | 0001 |
+---------+-------+---------------------------------+---------+
|Hex: | 0x1 | 0x3870 (14448) | 0x1 |
+---------+-------+---------------------------------+---------+
- ENC (Encryption): 0 -> No encryption, 1 -> AES-CTR mode, higher -> reserved.
- Time: Minute counter since 01/01/2013 (?), or since meter started, requires RTC set on meter. \
So time measurement was taken about 10 days after the meter was started.
- Session: Incremented by meter for each transmission, unless using partial/fractured frames.
Parsing. The whole ELL-SN field is read out and masks are used to extract fields.
Telegram examples
=================
Encrypted short telegrams:
+---+---+------+----------+---+---+---+---+---+----------+----------------------------------------------------+------+
|L |C |M |A |Ver|Med|CI |CC |ACC|AES CTR |Encrypted payload |CRC 16|
+===+===+======+==========+===+===+===+===+===+==========+====================================================+======+
|27 |44 |2D 2C |5768 6632 |30 |02 |8D |20 |2E |2187 0320 |D3A4F149 B1B8F578 3DF7434B 8A66A557 86499ABE 7BAB59 |xxxx |
+---+---+------+----------+---+---+---+---+---+----------+----------------------------------------------------+------+
|27 |44 |2d 2c |5768 6632 |30 |02 |8d |20 |63 |60dd 0320 |c42b87f4 6fc048d4 2498b44b 5e34f083 e93e6af1 617631 |3d9c |
+---+---+------+----------+---+---+---+---+---+----------+----------------------------------------------------+------+
|27 |44 |2d 2c |5768 6632 |30 |02 |8d |20 |8e |11de 0320 |188851bd c4b72dd3 c2954a34 1be369e9 089b4eb3 858169 |494e |
+---+---+------+----------+---+---+---+---+---+----------+----------------------------------------------------+------+
Encrypted long telegrams:
+---+---+------+----------+---+---+---+---+---+----------+------------------------------------------------------------------+------+
|L |C |M |A |Ver|Med|CI |CC |ACC|AES CTR |Encrypted payload |CRC 16|
+===+===+======+==========+===+===+===+===+===+==========+==================================================================+======+
|2D |44 |2D 2C |5768 6632 |30 |02 |8D |20 |64 |61DD 0320 |38931d14 b405536e 0250592f 8b908138 d58602ec a676ff79 e0caf0b1 4d |0e7d |
+---+---+------+----------+---+---+---+---+---+----------+------------------------------------------------------------------+------+
Decryption
==========
- The wireless m-bus on OmniPower uses AES-128 Mode CTR (if enabled, otherwise no encryption).
- See EN 13757-4:2019, p. 54, as ELL (Ext. Link-Layer) with ENC = 0x1 => AES-CTR.
- A decryption prefix (initial counter block) is built from some of the fields.
- See table 54 on p. 55 of EN 13757-4:2019.
It can be packed using the format `<HIBBBIB`.
+-----+---------+---+---+---+---------+-----+----+
|M |A |Ver|Med|CC |AES_CTR |FN |BC |
+=====+=========+===+===+===+=========+=====+====+
|2D2C |57686632 |30 |02 |20 |21870320 |0000 |00 |
+-----+---------+---+---+---+---------+-----+----+
- AES Prefix (initialization vector): Fields M, ..., AES_CTR, FN.
- FN: frame number (frame # sent by meter within same session number, in case of multi-frame transmissions).
- AES Counter: BC
- BC: Block counter (encryption block number, counts up for each 16 byte block decrypted within the telegram).
Decrypted payload examples
==========================
The interpretation of the fields in the OmniPower is
+----------+---------------+----------------+--------------------+--------------------------------------+-----------+
| Field | Kamstrup name | Data fmt (DIF) | Value type (VIF/E) | VIF/E meaning | DIF VIF/E |
+==========+===============+================+====================+======================================+===========+
| Data 1 | A+ | 32-bit uint | Energy, 10^1 Wh | Consumption from grid, accum. | 04 04 |
+----------+---------------+----------------+--------------------+--------------------------------------+-----------+
| Data 2 | A- | 32-bit uint | Energy, 10^1 Wh | Production to grid, accum. | 04 84 3C |
+----------+---------------+----------------+--------------------+--------------------------------------+-----------+
| Data 3 | P+ | 32-bit uint | Power, 10^0 W | Consumption from grid, instantan. | 04 2B |
+----------+---------------+----------------+--------------------+--------------------------------------+-----------+
| Data 4 | P- | 32-bit uint | Power, 10^0 W | Production to grid, instantan. | 04 AB 3C |
+----------+---------------+----------------+--------------------+--------------------------------------+-----------+
Transport layer control information fields (TPL-CI), ref. EN 13757-7:2018, p. 17, introduce Application Layer (APL) as:
- 0x78 with full frames (Response from device, full M-Bus frame)
- 0x79 with compact frames (Response from device, M-Bus compact frame)
Data integrity check (CRC16)
____________________________
The first 2 bytes (16 bits) of a payload is always the CRC16 value of the sent message.
This value must be checked versus CRC16 calculated on the received payload.
Decrypted short telegram payload
________________________________
+------+-------+----------------+-----------+---------+---------+---------+---------+
|CRC16 |TPL-CI |Data fmt. sign. |CRC16 data |Data 1 |Data 2 |Data 3 |Data 4 |
+======+=======+================+===========+=========+=========+=========+=========+
|1170 |79 |138C |4491 |CE000000 |00000000 |03000000 |00000000 |
+------+-------+----------------+-----------+---------+---------+---------+---------+
Measurement data starts at byte 7, and can easily be extracted using `<IIII` little-endian format.
In this example, 206 10^1 Wh (2.06 kWh) have been consumed, and the current power draw is 3 10^0 W (0.003 kW).
Decrypted long telegram payload
_______________________________
In this kind of telegram, the DRHs are included.
+------+-------+----------+---------+---------------+---------+----------+---------+----------+---------+
|CRC16 |TPL-CI |DIF/VIF 1 |Data 1 |DIF/VIF/VIFE 2 |Data 2 |DIF/VIF 3 |Data 3 |DIF/VIF 4 |Data 4 |
+======+=======+==========+=========+===============+=========+==========+=========+==========+=========+
|9831 |78 |04 04 |D7000000 |04 84 3C |00000000 |04 2B |03000000 |04 AB 3C |00000000 |
+------+-------+----------+---------+---------------+---------+----------+---------+----------+---------+
Extraction is slightly more complex, requiring either a longer parsing pattern or perhaps a regex.
In this example, 215 10^1 Wh (2.15 kWh) have been consumed, and the current power draw is 3 10^0 W (0.003 kW).
"""
from binascii import hexlify, unhexlify
from struct import *
from Crypto.Cipher import AES
from Crypto.Util import Counter
from datetime import datetime
import json
import re
from typing import List, Tuple
# And our own implementation
from meter.MeterMeasurement import MeterMeasurement, Measurement
from utils.timezone import ZuluTime
from utils.crc16_wmbus import crc16_wmbus, crc16_check, CrcCheckException
# Set timezone
zulu_time = ZuluTime()
[docs]class C1Telegram:
"""
Implements capture of data fields for a C1 telegram from OmniPower
"""
# Define slices for C1 telegram
# These byte positions are multiplied by *2 to get hex digit count
header_slice = slice(0, 17*2) # Byte 0-16 (inclusive)
ell_sn_slice = slice(13*2, 17*2) # Bytes 13-16 (inclusive)
payload_start_byte = 17
im871a_crc_bytes = 2
def __init__(self, telegram: bytes) -> None:
"""
Take a telegram (bytestring with hex values) and parses into fields
"""
try:
# Pull out non-encrypted header. Will discard this variable after parsing it
header = telegram[self.header_slice]
# Encrypted part of telegram, keep after parsing
self.encrypted = telegram[self.payload_start_byte * 2 : len(telegram) - self.im871a_crc_bytes * 2]
# The payload message is set as an empty bytestring until decrypted
self.decrypted = bytes()
# The CRC16 from the IM871-A dongle, always at the end
self.im871_crc = telegram[len(telegram) - self.im871a_crc_bytes * 2 : ]
# Unpack header values into a tuple
header_values = unpack('<BBHIBBBBBI', unhexlify(header))
# Extract fields by field numbers, see first table in documentation
self.L = header_values[0]
self.C = header_values[1]
self.M = header_values[2]
self.A = header_values[3]
self.version = header_values[4]
self.medium = header_values[5]
self.CI = header_values[6]
self.CC = header_values[7]
self.ACC = header_values[8]
self.AES_CTR = header_values[9]
# The ELL-SN (AES_CTR) field is a composite 4-byte session number (SN) field.
# Including encryption method, minute counter and session number
ell_sn = self.parse_ell_sn(telegram[self.ell_sn_slice])
self.SN = {
"ENC": ell_sn[0],
"Time": ell_sn[1],
"Session": ell_sn[2],
}
# Store original hex values as big-endian inside strings
# for comparison with human-readable values
self.big_endian = {
'L': hexlify(pack('>B', self.L)),
'C': hexlify(pack('>B', self.C)),
'M': hexlify(pack('>H', self.M)),
'A': hexlify(pack('>I', self.A)),
'version': hexlify(pack('>B', self.version)),
'medium': hexlify(pack('>B', self.medium)),
'CI': hexlify(pack('>B', self.CI)).upper(),
'CC': hexlify(pack('>B', self.CC)).upper(),
'ACC': hexlify(pack('>B', self.ACC)),
'AES_CTR': hexlify(pack('>I', self.AES_CTR)),
}
# Compute decryption prefix
# See recipe in documentation, currently FN=0
self.prefix = pack('<HIBBBIB', self.M, self.A, self.version, self.medium, self.CC, self.AES_CTR, 0)
except Exception as e:
# TODO: Specify error type caught and handle
# Raise exception for upstream handling. and propagate the existing exception
raise TelegramParseException("Failed to parse.") from e
@staticmethod
def parse_ell_sn(sn_field: bytes) -> Tuple[int, ...]:
# Get 32 bit from little-endian format
ell_sn = unpack('<I', unhexlify(sn_field))[0]
# Get bits using masks
enc = (ell_sn & (0xe << 28)) >> 29 # Get bits 31-29
time = (ell_sn & (0x1ffffff << 4)) >> 4 # Get bits 28-04
session = ell_sn & 0xf # Get bits 03-00
return enc, time, session
[docs] def decrypt_using(self, meter: 'OmniPower') -> bool:
"""
Decrypts a telegram and inserts it into telegram (self)
Uses a meter object and its key to perform decryption.
Requires instantiated OmniPower meter with valid AES-key.
"""
try:
# Store decrypted value in field decrypted
self.decrypted = meter.decrypt(self)
return True
except AesKeyException as e:
# Missing key or malformed
print(e)
return False
except CrcCheckException as e:
# Bad message received, CRC check fail
print(e)
return False
except:
# TODO: Blanket broad exception - bad idea, test and fix!
return False
[docs]class OmniPower:
"""
Implementation of our OmniPower single-phase meter
Passed values are hex encoded as string, e.g. '2C2D' for value 0x2C2D.
"""
# Required AES key digits (128 bit -> 16 bytes -> 32 hex digits)
# Anything else must be a malformed key
aes_req_key_digits = 2*16
# Byte limit for short, data-only telegrams from OmniPower.
# Larger telegrams also contain DIF/VIF information
short_telegram_lim = 39
# TPL-CI field value for different C1 frame formats
tpl_ci_field = slice(4, 6)
tpl_ci_compact = b'79' # This kind is data-only
tpl_ci_drh = b'78' # This kind has DIF/VIF information too
# Short telegram format is contiguous frame of 4 32-bit uints
# The data begins at byte 7 of the payload
short_telegram_fmt = '<IIII'
short_telegram_data_slice = slice(7*2, None)
# Long telegram format contains DIF/VIF/VIF followed by values.
# The DIF 04 specifies a 32-bit uint, so the little-endian format '<I' is used.
long_telegram_fmt = (('0404', '<I'),
('04843C', '<I'),
('042B', '<I'),
('04AB3C', '<I'))
def __init__(self,
name: str = 'Kamstrup OmniPower one-phase',
meter_id: str = '32666857',
manufacturer_id: str = '2C2D',
medium: str = '02',
version: str = '30',
aes_key: str = '9A25139E3244CC2E391A8EF6B915B697'):
self.name = name # Meter nickname
self.meter_id = meter_id # serial number of the meter
self.manufacturer_id = manufacturer_id # Kamstrup manufacturer ID
self.medium = medium # Medium/type of meter, e.g 0x02 is electricity
self.version = version # Firmware version for the wm-bus interface
self.AES_key = aes_key # 128-bit AES encryption key
self.measurement_log = [] # type: List['MeterMeasurement']
[docs] def is_this_my(self, telegram: 'C1Telegram') -> bool:
"""
Check whether a given telegram is from this meter by comparing meter setting to telegram
"""
# Comparison is done on lowercase strings and big-endian values, e.g. 2c2d == 2c2d
if (telegram.big_endian['A'] == self.meter_id.lower().encode()) and \
(telegram.big_endian['M'] == self.manufacturer_id.lower().encode()) and \
(telegram.big_endian['version'] == self.version.lower().encode()) and \
(telegram.big_endian['medium'] == self.medium.lower().encode()):
return True
else:
return False
[docs] def decrypt(self, telegram: 'C1Telegram') -> bytes:
"""
Decrypt a telegram. Returns decrypted bytes.
Raises CrcCheckException if CRCs do not match after decryption.
Requires:
- the prefix from the telegram (telegram.prefix), and
- the encryption key stored in the meter object.
Decrypts the data stored in field telegram.encrypted
"""
# Check that 16 byte (128 bit) key has 32 hex digits
if len(self.AES_key) != self.aes_req_key_digits:
raise AesKeyException("Bad key length, {}".format(len(self.AES_key)))
# Get relevant variables
key_in = self.AES_key # UTF-8 formatted
ciphertext = telegram.encrypted # ASCII string
prefix = telegram.prefix # bytestring
# Make binary representations
key = unhexlify(key_in) # type: bytes
ciphertext = unhexlify(ciphertext)
# Create cryptographic objects to decrypt using AES-CTR method
counter = Counter.new(nbits=16, prefix=prefix, initial_value=0x0000)
cipher = AES.new(key, AES.MODE_CTR, counter=counter)
# Perform decryption
decryption = hexlify(cipher.decrypt(ciphertext))
# Will raise exception if CRC check failed, to be caught upstream
crc16_check(decryption)
return decryption
[docs] @classmethod
def unpack_short_telegram_data(cls, data: bytes) -> Tuple[int, ...]:
"""
Short C1 telegrams only contain field data values, no information about DIF/VIF
"""
# Extract the measurements into a 4-tuple
return unpack(cls.short_telegram_fmt, unhexlify(data[cls.short_telegram_data_slice]))
[docs] @classmethod
def unpack_long_telegram_data(cls, data: bytes) -> Tuple[int, ...]:
"""
Long C1 telegrams contain DIF/VIF information and field data values
"""
# Make return value vector with exactly as many zeros as there are expected fields.
# So if one field is not found, a zero is returned in its place
return_val = [0] * len(cls.long_telegram_fmt)
for i, fmt in enumerate(cls.long_telegram_fmt):
# Build Regex, which searches for the code and groups the 8 subsequent characters
pattern = fmt[0].lower() + "(.{8})"
# Search for the pattern in the data, which we first format to an UTF-8 string with decode
match = re.search(pattern, data.decode())
if match:
# Unpack the found specific 8 characters (4 bytes = 32 bits)
# match.group(1) contains these 8 characters
# unpack returns a 1-tuple, from which we grab the single integer element with [0]
# print("Found DIF/VIF/VIFE field {} in data records (long) telegram".format(fmt[0]))
return_val[i] = unpack(fmt[1], unhexlify(match.group(1).encode()))[0]
# Finally, return a tuple that we can use to convert and log measurements
return tuple(return_val)
[docs] def add_measurement_to_log(self, measurement: MeterMeasurement) -> bool:
"""
Pushes a new measurement to the tail end of the log
"""
try:
self.measurement_log.append(measurement)
return True
except:
# TODO: Specify potential exceptions / errors
return False
[docs] def process_telegram(self, telegram: 'C1Telegram') -> bool:
"""
Does entire processing chain for a telegram, including adding to log.
Returns True if processing is OK and added to log OK.
Otherwise False.
"""
# Confirm that the telegram belongs to this meter, then
# try to decrypt the telegram
if self.is_this_my(telegram) and telegram.decrypt_using(self):
# extract measurements
measurement_frame = self.extract_measurement_frame(telegram)
# Confirm not empty
if not measurement_frame.is_empty():
# then add measurement frame to log and return True if okay
return self.add_measurement_to_log(measurement_frame)
else:
# Is empty. No data was added, so nothing to be logged
return False
else:
# This is not my telegram or telegram failed to decrypt
return False
[docs] def dump_log_to_json(self) -> str:
"""
Returns a JSON string of all measurement frames in log, with an incremented number for each observation.
"""
dump = {}
# Fill object
[dump.update({str(i): log_i.as_dict()}) for i, log_i in enumerate(self.measurement_log)]
# Return JSON-string
return json.dumps(dump)
[docs]class AesKeyException(Exception):
"""
Use this to raise an exception when an AES key is missing or wrong length.
"""
def __init__(self, exception_message: str):
self.exception_message = exception_message
# Invoke constructor for base class
super().__init__(self.exception_message)
def __str__(self):
# String representation of exception if printed
return "AES key missing from meter object or malformed: {}.".format(self.exception_message)
[docs]class TelegramParseException(Exception):
"""
Use this to raise an exception if a bytestream telegram fails to parse into C1 format.
"""
def __init__(self, exception_message: str):
self.exception_message = exception_message
# Invoke constructor for base class
super().__init__(self.exception_message)
def __str__(self):
# String representation of exception if printed
return "Byte object could not be parsed as a C1 telegram: {}.".format(self.exception_message)