| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235 | 
							- import time
 
- from serial import Serial
 
- import colorama
 
- import struct
 
- from colorama import Fore, Style 
 
- from typing import Sequence, Union
 
- from binascii import b2a_hex
 
- DEFAULT_MB_CRC_TABLE = (
 
-     00000, 49345, 49537,   320, 49921,   960,   640, 49729, 50689,  1728,  1920,
 
-     51009,  1280, 50625, 50305,  1088, 52225,  3264,  3456, 52545,  3840, 53185,
 
-     52865,  3648,  2560, 51905, 52097,  2880, 51457,  2496,  2176, 51265, 55297,
 
-     6_336,  6528, 55617,  6912, 56257, 55937,  6720,  7680, 57025, 57217,  8000,
 
-     56577,  7616,  7296, 56385,  5120, 54465, 54657,  5440, 55041,  6080,  5760,
 
-     54849, 53761,  4800,  4992, 54081,  4352, 53697, 53377,  4160, 61441, 12480,
 
-     12672, 61761, 13056, 62401, 62081, 12864, 13824, 63169, 63361, 14144, 62721,
 
-     13760, 13440, 62529, 15360, 64705, 64897, 15680, 65281, 16320, 16000, 65089,
 
-     64001, 15040, 15232, 64321, 14592, 63937, 63617, 14400, 10240, 59585, 59777,
 
-     10560, 60161, 11200, 10880, 59969, 60929, 11968, 12160, 61249, 11520, 60865,
 
-     60545, 11328, 58369,  9408,  9600, 58689,  9984, 59329, 59009,  9792,  8704,
 
-     58049, 58241,  9024, 57601,  8640,  8320, 57409, 40961, 24768, 24960, 41281,
 
-     25344, 41921, 41601, 25152, 26112, 42689, 42881, 26432, 42241, 26048, 25728,
 
-     42049, 27648, 44225, 44417, 27968, 44801, 28608, 28288, 44609, 43521, 27328,
 
-     27520, 43841, 26880, 43457, 43137, 26688, 30720, 47297, 47489, 31040, 47873,
 
-     31680, 31360, 47681, 48641, 32448, 32640, 48961, 32000, 48577, 48257, 31808,
 
-     46081, 29888, 30080, 46401, 30464, 47041, 46721, 30272, 29184, 45761, 45953,
 
-     29504, 45313, 29120, 28800, 45121, 20480, 37057, 37249, 20800, 37633, 21440,
 
-     21120, 37441, 38401, 22208, 22400, 38721, 21760, 38337, 38017, 21568, 39937,
 
-     23744, 23936, 40257, 24320, 40897, 40577, 24128, 23040, 39617, 39809, 23360,
 
-     39169, 22976, 22656, 38977, 34817, 18624, 18816, 35137, 19200, 35777, 35457,
 
-     19008, 19968, 36545, 36737, 20288, 36097, 19904, 19584, 35905, 17408, 33985,
 
-     34177, 17728, 34561, 18368, 18048, 34369, 33281, 17088, 17280, 33601, 16640,
 
-     33217, 32897, 16448)
 
- class NoResponseError(IOError):
 
-     pass
 
- class ChecksumError(IOError):
 
-     pass
 
- class MBError(IOError):
 
-     pass
 
- class ModbusMixin:
 
-     def print_hex(self, text: str, data: bytes):
 
-         print(text, *tuple(map(lambda x: '0x{:02X}'.format(x), (i for i in data))))
 
- class Modbus(ModbusMixin):
 
-     # MB_CRC_TABLE: Sequence[int] = DEFAULT_MB_CRC_TABLE
 
-     REF_TYPE = 6
 
-     MB_TIMEOUT: float = 0.5 # 0.05
 
-     MB_CRC_TABLE: Sequence[int] = DEFAULT_MB_CRC_TABLE
 
-     MB_DEBUG: bool = False
 
-     MB_TRIES: int = 3
 
-     # def __init__(self, tty: str, brate: int, address: int):
 
-     def __init__(self, serial: Serial, address: int):
 
-         self.serial = serial
 
-         self.address = address
 
-     @classmethod
 
-     def test(cls):
 
-         print(type(cls.MB_CRC_TABLE))
 
-     @classmethod
 
-     def _crc(cls, data: bytes) -> bytes:
 
-         crc = 0xFFFF
 
-         crc_table = cls.MB_CRC_TABLE
 
-         for char in data:
 
-             crc = (crc >> 8) ^ crc_table[(crc ^ char) & 0xFF]
 
-         return crc.to_bytes(2, 'little')
 
-     def raw_communicate(self, data: bytes, predicted_length:int = -1) -> bytes:
 
-         """Send request and return it back with checksum"""
 
-         if self.MB_DEBUG:
 
-             self.print_hex('Request:', data)
 
-         self.serial.write(data)
 
-         response_bytes = bytearray()
 
-         start_time = time.time()
 
-         while True:
 
-             b = self.serial.read(1)
 
-             if len(b):
 
-                 new_byte = bytearray(b)
 
-                 response_bytes.extend(new_byte)
 
-             elif time.time() - start_time > self.MB_TIMEOUT:
 
-                 break
 
-             if len(response_bytes) == predicted_length:
 
-                 break
 
-         if self.MB_DEBUG:
 
-             self.print_hex('Responce:', response_bytes)
 
-         return response_bytes
 
-     
 
-     def communicate(self, request: bytes, predicted_length: int = -1) -> bytes:
 
-         """Send request and return rewponse after checksum check"""
 
-         pl = predicted_length + 2 if predicted_length != -1 else -1
 
-         response = self.raw_communicate(request + self._crc(request), pl)
 
-         if len(response) == 0:
 
-             raise NoResponseError('No response frome module')
 
-         crc_received = response[-2:]
 
-         crc_calculated = self._crc(response[:-2])
 
-         if crc_received != crc_calculated:
 
-             raise ChecksumError("CRC check failed (got: 0x{}, calculated: 0x{})".format(
 
-                 b2a_hex(crc_received).decode().upper(), b2a_hex(crc_calculated).decode().upper()))
 
-         return response
 
-     def mb_func(self, code: int, req: bytes, predicted_response_length: int = -1):
 
-         """Call typical MB function with address and function code check,
 
-         return just response, no header and checksum included"""
 
-         error = None
 
-         for i in range(self.MB_TRIES):
 
-             try:
 
-                 pl = predicted_response_length + 2 if predicted_response_length != -1 else -1
 
-                 response = self.communicate(b''.join((self.address.to_bytes(1, 'big'),
 
-                                                       code.to_bytes(1, 'big'), req)), pl)
 
-                 if response[0] != self.address:
 
-                     raise MBError("Modbus address mismatch in request and response ({}/{})".format(
 
-                         self.address, response[0]))
 
-                 elif response[1] != code:
 
-                     raise MBError("Modbus function opcode mismatch in request and response ({}/{})".format(
 
-                         code, response[1]))
 
-                 return response[2:-2]
 
-             except (MBError, ChecksumError, NoResponseError) as e:
 
-                 if self.MB_DEBUG:
 
-                     print(e)
 
-                 error = e
 
-                 time.sleep(0.5)
 
-         raise error
 
-     def _read_registers(self, opcode: int, address: int, count: int, raw: bool = False) -> Union[Sequence[int], bytes]:
 
-         """Read values of inpurt/holding registers from device"""
 
-         response = self.mb_func(opcode, address.to_bytes(2, 'big') + count.to_bytes(2, 'big'), 1 + 2*count)
 
-         try:
 
-             if raw:
 
-                 return response[1:]
 
-             inp_count = response[0] // 2
 
-             return tuple(int.from_bytes(response[i:i + 2], 'big') for i in range(1, inp_count*2, 2))
 
-         except KeyError:
 
-             raise MBError("Incorrect response payload")
 
-     # 0x03
 
-     def read_holding_registers(self, address: int, count: int) -> Sequence[int]:
 
-         return self._read_registers(3, address, count)
 
-     def read_holding_registers_raw(self, address: int, count: int) -> bytes:
 
-         return self._read_registers(3, address, count, True)
 
-     def read_uint32_holding(self, address: int) -> int:
 
-         """Read 32-bit integer from holding registers"""
 
-         return struct.unpack('>I', self.read_holding_registers_raw(address, 2))[0]
 
-     def read_uint64_holding(self, address: int) -> int:
 
-         """Read 64-bit integer from holding registers"""
 
-         return struct.unpack('>Q', self.read_holding_registers_raw(address, 4))[0]        
 
-     def read_float_holding(self, address: int) -> float:
 
-         """Read float integer from two holding registers"""
 
-         return struct.unpack('>f', self.read_holding_registers_raw(address, 2))[0]
 
-     # 0x14
 
-     def read_file_record(self, rec_type: int, channel_number: int, rec_index: int, rec_num: int):
 
-         """Read file record
 
-         rec_type - код записи, 0x06 - архив, 0x07 - журнал
 
-         channel_number - номер канала архива (0 если это журнал)
 
-         rec_index - порядковый номер (индекс) записи
 
-         rec_num - количество записей
 
-         """
 
-         request = bytearray((self.address,  0x14, 7, rec_type))
 
-         request += rec_index.to_bytes(2, 'big')
 
-         request += channel_number.to_bytes(2, 'big')
 
-         request += rec_num.to_bytes(2, 'big')
 
-         return self.raw_communicate(request + self._crc(request))
 
-     # 0x10
 
-     def write_holding_registers_raw(self, address:int, values: bytes):
 
-         """Write 16-bit integers to holding registers on device"""
 
-         request = bytearray(address.to_bytes(2, 'big'))
 
-         request += (len(values) // 2).to_bytes(2, 'big')
 
-         request += len(values).to_bytes(1, 'big')
 
-         request += values
 
-         response = self.mb_func(16, bytes(request), 4)
 
-         if request[:4] != response:
 
-             raise MBError('Incorrect response payload')
 
-     
 
-     #
 
-     def write_holding_register(self, address: int, value: int):
 
-         """Write 16-bit integer to register on device"""
 
-         request = address.to_bytes(2, 'big') + value.to_bytes(2, 'big')
 
-         response = self.mb_func(6, request, len(request))
 
-         if request != response:
 
-             raise MBError('Incorrect response payload')
 
-     def write_uint32(self, address: int, value: int):
 
-         """Write 32-bit integer to holding register"""
 
-         self.write_holding_registers_raw(address, struct.pack('>I', value))
 
-     def write_uint64(self, address: int, value: int):
 
-         """Write 64-bit integer to holding register"""
 
-         self.write_holding_registers_raw(address, struct.pack('>Q', value))
 
-     def write_float(self, address: int, value: float):
 
-         """Write float to two holding registers"""
 
-         self.write_holding_registers_raw(address, struct.pack('>f', value))
 
-     def test_send(self, data: bytes):
 
-         while True:
 
-             self.serial.write(data)
 
-             time.sleep(1)
 
-     def send_recv(self, data, number):
 
-         self.serial.write(data)
 
-         
 
-         # while True:
 
-         #     self.serial.write(data)
 
-         #     recv = self.serial.read_all()
 
-         #     print(recv)
 
-         #     print(recv.decode('utf_8'))
 
-         #     time.sleep(1)
 
- def main():
 
-     ADDR = 1
 
-     colorama.init()
 
-     dev = Modbus('COM22', 115200, ADDR)
 
-     
 
-     # print(dev.read_holding_registers(0x0100, 1))
 
-     # print(dev.read_uint32_holding(0x0100))
 
-     # time.sleep(1)
 
- if __name__ == '__main__':
 
-     main()
 
 
  |