import time from serial import Serial import colorama import struct from colorama import Fore, Style from typing import Sequence, Union from binascii import b2a_hex DEFAULT_MB_CRC_TABLE = ( 00000, 49345, 49537, 320, 49921, 960, 640, 49729, 50689, 1728, 1920, 51009, 1280, 50625, 50305, 1088, 52225, 3264, 3456, 52545, 3840, 53185, 52865, 3648, 2560, 51905, 52097, 2880, 51457, 2496, 2176, 51265, 55297, 6_336, 6528, 55617, 6912, 56257, 55937, 6720, 7680, 57025, 57217, 8000, 56577, 7616, 7296, 56385, 5120, 54465, 54657, 5440, 55041, 6080, 5760, 54849, 53761, 4800, 4992, 54081, 4352, 53697, 53377, 4160, 61441, 12480, 12672, 61761, 13056, 62401, 62081, 12864, 13824, 63169, 63361, 14144, 62721, 13760, 13440, 62529, 15360, 64705, 64897, 15680, 65281, 16320, 16000, 65089, 64001, 15040, 15232, 64321, 14592, 63937, 63617, 14400, 10240, 59585, 59777, 10560, 60161, 11200, 10880, 59969, 60929, 11968, 12160, 61249, 11520, 60865, 60545, 11328, 58369, 9408, 9600, 58689, 9984, 59329, 59009, 9792, 8704, 58049, 58241, 9024, 57601, 8640, 8320, 57409, 40961, 24768, 24960, 41281, 25344, 41921, 41601, 25152, 26112, 42689, 42881, 26432, 42241, 26048, 25728, 42049, 27648, 44225, 44417, 27968, 44801, 28608, 28288, 44609, 43521, 27328, 27520, 43841, 26880, 43457, 43137, 26688, 30720, 47297, 47489, 31040, 47873, 31680, 31360, 47681, 48641, 32448, 32640, 48961, 32000, 48577, 48257, 31808, 46081, 29888, 30080, 46401, 30464, 47041, 46721, 30272, 29184, 45761, 45953, 29504, 45313, 29120, 28800, 45121, 20480, 37057, 37249, 20800, 37633, 21440, 21120, 37441, 38401, 22208, 22400, 38721, 21760, 38337, 38017, 21568, 39937, 23744, 23936, 40257, 24320, 40897, 40577, 24128, 23040, 39617, 39809, 23360, 39169, 22976, 22656, 38977, 34817, 18624, 18816, 35137, 19200, 35777, 35457, 19008, 19968, 36545, 36737, 20288, 36097, 19904, 19584, 35905, 17408, 33985, 34177, 17728, 34561, 18368, 18048, 34369, 33281, 17088, 17280, 33601, 16640, 33217, 32897, 16448) class NoResponseError(IOError): pass class ChecksumError(IOError): pass class MBError(IOError): pass class ModbusMixin: def print_hex(self, text: str, data: bytes): print(text, *tuple(map(lambda x: '0x{:02X}'.format(x), (i for i in data)))) class Modbus(ModbusMixin): # MB_CRC_TABLE: Sequence[int] = DEFAULT_MB_CRC_TABLE REF_TYPE = 6 MB_TIMEOUT: float = 0.5 # 0.05 MB_CRC_TABLE: Sequence[int] = DEFAULT_MB_CRC_TABLE MB_DEBUG: bool = False MB_TRIES: int = 3 # def __init__(self, tty: str, brate: int, address: int): def __init__(self, serial: Serial, address: int): self.serial = serial self.address = address @classmethod def test(cls): print(type(cls.MB_CRC_TABLE)) @classmethod def _crc(cls, data: bytes) -> bytes: crc = 0xFFFF crc_table = cls.MB_CRC_TABLE for char in data: crc = (crc >> 8) ^ crc_table[(crc ^ char) & 0xFF] return crc.to_bytes(2, 'little') def raw_communicate(self, data: bytes, predicted_length:int = -1) -> bytes: """Send request and return it back with checksum""" if self.MB_DEBUG: self.print_hex('Request:', data) self.serial.write(data) response_bytes = bytearray() start_time = time.time() while True: b = self.serial.read(1) if len(b): new_byte = bytearray(b) response_bytes.extend(new_byte) elif time.time() - start_time > self.MB_TIMEOUT: break if len(response_bytes) == predicted_length: break if self.MB_DEBUG: self.print_hex('Responce:', response_bytes) return response_bytes def communicate(self, request: bytes, predicted_length: int = -1) -> bytes: """Send request and return rewponse after checksum check""" pl = predicted_length + 2 if predicted_length != -1 else -1 response = self.raw_communicate(request + self._crc(request), pl) if len(response) == 0: raise NoResponseError('No response frome module') crc_received = response[-2:] crc_calculated = self._crc(response[:-2]) if crc_received != crc_calculated: raise ChecksumError("CRC check failed (got: 0x{}, calculated: 0x{})".format( b2a_hex(crc_received).decode().upper(), b2a_hex(crc_calculated).decode().upper())) return response def mb_func(self, code: int, req: bytes, predicted_response_length: int = -1): """Call typical MB function with address and function code check, return just response, no header and checksum included""" error = None for i in range(self.MB_TRIES): try: pl = predicted_response_length + 2 if predicted_response_length != -1 else -1 response = self.communicate(b''.join((self.address.to_bytes(1, 'big'), code.to_bytes(1, 'big'), req)), pl) if response[0] != self.address: raise MBError("Modbus address mismatch in request and response ({}/{})".format( self.address, response[0])) elif response[1] != code: raise MBError("Modbus function opcode mismatch in request and response ({}/{})".format( code, response[1])) return response[2:-2] except (MBError, ChecksumError, NoResponseError) as e: if self.MB_DEBUG: print(e) error = e time.sleep(0.5) raise error def _read_registers(self, opcode: int, address: int, count: int, raw: bool = False) -> Union[Sequence[int], bytes]: """Read values of inpurt/holding registers from device""" response = self.mb_func(opcode, address.to_bytes(2, 'big') + count.to_bytes(2, 'big'), 1 + 2*count) try: if raw: return response[1:] inp_count = response[0] // 2 return tuple(int.from_bytes(response[i:i + 2], 'big') for i in range(1, inp_count*2, 2)) except KeyError: raise MBError("Incorrect response payload") # 0x03 def read_holding_registers(self, address: int, count: int) -> Sequence[int]: return self._read_registers(3, address, count) def read_holding_registers_raw(self, address: int, count: int) -> bytes: return self._read_registers(3, address, count, True) def read_uint32_holding(self, address: int) -> int: """Read 32-bit integer from holding registers""" return struct.unpack('>I', self.read_holding_registers_raw(address, 2))[0] def read_uint64_holding(self, address: int) -> int: """Read 64-bit integer from holding registers""" return struct.unpack('>Q', self.read_holding_registers_raw(address, 4))[0] # 0x10 def write_holding_registers_raw(self, address:int, values: bytes): """Write 16-bit integers to holding registers on device""" request = bytearray(address.to_bytes(2, 'big')) request += (len(values) // 2).to_bytes(2, 'big') request += len(values).to_bytes(1, 'big') request += values response = self.mb_func(16, bytes(request), 4) if request[:4] != response: raise MBError('Incorrect response payload') # def write_holding_register(self, address: int, value: int): """Write 16-bit integer to register on device""" request = address.to_bytes(2, 'big') + value.to_bytes(2, 'big') response = self.mb_func(6, request, len(request)) if request != response: raise MBError('Incorrect response payload') # 0x14 def read_file_record(self, rec_type: int, channel_number: int, rec_index: int, rec_num: int): """Read file record rec_type - код записи, 0x06 - архив, 0x07 - журнал channel_number - номер канала архива (0 если это журнал) rec_index - порядковый номер (индекс) записи rec_num - количество записей """ request = bytearray((self.address, 0x14, 7, rec_type)) request += rec_index.to_bytes(2, 'big') request += channel_number.to_bytes(2, 'big') request += rec_num.to_bytes(2, 'big') return self.raw_communicate(request + self._crc(request)) def write_uint32(self, address: int, value: int): """Write 32-bit integer to holding register""" self.write_holding_registers_raw(address, struct.pack('>I', value)) def write_uint64(self, address: int, value: int): """Write 64-bit integer to holding register""" self.write_holding_registers_raw(address, struct.pack('>Q', value)) def test_send(self, data: bytes): while True: self.serial.write(data) time.sleep(1) def send_recv(self, data, number): self.serial.write(data) # while True: # self.serial.write(data) # recv = self.serial.read_all() # print(recv) # print(recv.decode('utf_8')) # time.sleep(1) def main(): ADDR = 1 colorama.init() dev = Modbus('COM22', 115200, ADDR) # print(dev.read_holding_registers(0x0100, 1)) # print(dev.read_uint32_holding(0x0100)) # time.sleep(1) if __name__ == '__main__': main()