modbus.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. import time
  2. from serial import Serial
  3. import colorama
  4. import struct
  5. from colorama import Fore, Style
  6. from typing import Sequence, Union
  7. from binascii import b2a_hex
  8. DEFAULT_MB_CRC_TABLE = (
  9. 00000, 49345, 49537, 320, 49921, 960, 640, 49729, 50689, 1728, 1920,
  10. 51009, 1280, 50625, 50305, 1088, 52225, 3264, 3456, 52545, 3840, 53185,
  11. 52865, 3648, 2560, 51905, 52097, 2880, 51457, 2496, 2176, 51265, 55297,
  12. 6_336, 6528, 55617, 6912, 56257, 55937, 6720, 7680, 57025, 57217, 8000,
  13. 56577, 7616, 7296, 56385, 5120, 54465, 54657, 5440, 55041, 6080, 5760,
  14. 54849, 53761, 4800, 4992, 54081, 4352, 53697, 53377, 4160, 61441, 12480,
  15. 12672, 61761, 13056, 62401, 62081, 12864, 13824, 63169, 63361, 14144, 62721,
  16. 13760, 13440, 62529, 15360, 64705, 64897, 15680, 65281, 16320, 16000, 65089,
  17. 64001, 15040, 15232, 64321, 14592, 63937, 63617, 14400, 10240, 59585, 59777,
  18. 10560, 60161, 11200, 10880, 59969, 60929, 11968, 12160, 61249, 11520, 60865,
  19. 60545, 11328, 58369, 9408, 9600, 58689, 9984, 59329, 59009, 9792, 8704,
  20. 58049, 58241, 9024, 57601, 8640, 8320, 57409, 40961, 24768, 24960, 41281,
  21. 25344, 41921, 41601, 25152, 26112, 42689, 42881, 26432, 42241, 26048, 25728,
  22. 42049, 27648, 44225, 44417, 27968, 44801, 28608, 28288, 44609, 43521, 27328,
  23. 27520, 43841, 26880, 43457, 43137, 26688, 30720, 47297, 47489, 31040, 47873,
  24. 31680, 31360, 47681, 48641, 32448, 32640, 48961, 32000, 48577, 48257, 31808,
  25. 46081, 29888, 30080, 46401, 30464, 47041, 46721, 30272, 29184, 45761, 45953,
  26. 29504, 45313, 29120, 28800, 45121, 20480, 37057, 37249, 20800, 37633, 21440,
  27. 21120, 37441, 38401, 22208, 22400, 38721, 21760, 38337, 38017, 21568, 39937,
  28. 23744, 23936, 40257, 24320, 40897, 40577, 24128, 23040, 39617, 39809, 23360,
  29. 39169, 22976, 22656, 38977, 34817, 18624, 18816, 35137, 19200, 35777, 35457,
  30. 19008, 19968, 36545, 36737, 20288, 36097, 19904, 19584, 35905, 17408, 33985,
  31. 34177, 17728, 34561, 18368, 18048, 34369, 33281, 17088, 17280, 33601, 16640,
  32. 33217, 32897, 16448)
  33. class NoResponseError(IOError):
  34. pass
  35. class ChecksumError(IOError):
  36. pass
  37. class MBError(IOError):
  38. pass
  39. class ModbusMixin:
  40. def print_hex(self, text: str, data: bytes):
  41. print(text, *tuple(map(lambda x: '0x{:02X}'.format(x), (i for i in data))))
  42. class Modbus(ModbusMixin):
  43. # MB_CRC_TABLE: Sequence[int] = DEFAULT_MB_CRC_TABLE
  44. REF_TYPE = 6
  45. MB_TIMEOUT: float = 0.5 # 0.05
  46. MB_CRC_TABLE: Sequence[int] = DEFAULT_MB_CRC_TABLE
  47. MB_DEBUG: bool = False
  48. MB_TRIES: int = 3
  49. # def __init__(self, tty: str, brate: int, address: int):
  50. def __init__(self, serial: Serial, address: int):
  51. self.serial = serial
  52. self.address = address
  53. @classmethod
  54. def test(cls):
  55. print(type(cls.MB_CRC_TABLE))
  56. @classmethod
  57. def _crc(cls, data: bytes) -> bytes:
  58. crc = 0xFFFF
  59. crc_table = cls.MB_CRC_TABLE
  60. for char in data:
  61. crc = (crc >> 8) ^ crc_table[(crc ^ char) & 0xFF]
  62. return crc.to_bytes(2, 'little')
  63. def raw_communicate(self, data: bytes, predicted_length:int = -1) -> bytes:
  64. """Send request and return it back with checksum"""
  65. if self.MB_DEBUG:
  66. self.print_hex('Request:', data)
  67. self.serial.write(data)
  68. response_bytes = bytearray()
  69. start_time = time.time()
  70. while True:
  71. b = self.serial.read(1)
  72. if len(b):
  73. new_byte = bytearray(b)
  74. response_bytes.extend(new_byte)
  75. elif time.time() - start_time > self.MB_TIMEOUT:
  76. break
  77. if len(response_bytes) == predicted_length:
  78. break
  79. if self.MB_DEBUG:
  80. self.print_hex('Responce:', response_bytes)
  81. return response_bytes
  82. def communicate(self, request: bytes, predicted_length: int = -1) -> bytes:
  83. """Send request and return rewponse after checksum check"""
  84. pl = predicted_length + 2 if predicted_length != -1 else -1
  85. response = self.raw_communicate(request + self._crc(request), pl)
  86. if len(response) == 0:
  87. raise NoResponseError('No response frome module')
  88. crc_received = response[-2:]
  89. crc_calculated = self._crc(response[:-2])
  90. if crc_received != crc_calculated:
  91. raise ChecksumError("CRC check failed (got: 0x{}, calculated: 0x{})".format(
  92. b2a_hex(crc_received).decode().upper(), b2a_hex(crc_calculated).decode().upper()))
  93. return response
  94. def mb_func(self, code: int, req: bytes, predicted_response_length: int = -1):
  95. """Call typical MB function with address and function code check,
  96. return just response, no header and checksum included"""
  97. error = None
  98. for i in range(self.MB_TRIES):
  99. try:
  100. pl = predicted_response_length + 2 if predicted_response_length != -1 else -1
  101. response = self.communicate(b''.join((self.address.to_bytes(1, 'big'),
  102. code.to_bytes(1, 'big'), req)), pl)
  103. if response[0] != self.address:
  104. raise MBError("Modbus address mismatch in request and response ({}/{})".format(
  105. self.address, response[0]))
  106. elif response[1] != code:
  107. raise MBError("Modbus function opcode mismatch in request and response ({}/{})".format(
  108. code, response[1]))
  109. return response[2:-2]
  110. except (MBError, ChecksumError, NoResponseError) as e:
  111. if self.MB_DEBUG:
  112. print(e)
  113. error = e
  114. time.sleep(0.5)
  115. raise error
  116. def _read_registers(self, opcode: int, address: int, count: int, raw: bool = False) -> Union[Sequence[int], bytes]:
  117. """Read values of inpurt/holding registers from device"""
  118. response = self.mb_func(opcode, address.to_bytes(2, 'big') + count.to_bytes(2, 'big'), 1 + 2*count)
  119. try:
  120. if raw:
  121. return response[1:]
  122. inp_count = response[0] // 2
  123. return tuple(int.from_bytes(response[i:i + 2], 'big') for i in range(1, inp_count*2, 2))
  124. except KeyError:
  125. raise MBError("Incorrect response payload")
  126. # 0x03
  127. def read_holding_registers(self, address: int, count: int) -> Sequence[int]:
  128. return self._read_registers(3, address, count)
  129. def read_holding_registers_raw(self, address: int, count: int) -> bytes:
  130. return self._read_registers(3, address, count, True)
  131. def read_uint32_holding(self, address: int) -> int:
  132. """Read 32-bit integer from holding registers"""
  133. return struct.unpack('>I', self.read_holding_registers_raw(address, 2))[0]
  134. def read_uint64_holding(self, address: int) -> int:
  135. """Read 64-bit integer from holding registers"""
  136. return struct.unpack('>Q', self.read_holding_registers_raw(address, 4))[0]
  137. # 0x10
  138. def write_holding_registers_raw(self, address:int, values: bytes):
  139. """Write 16-bit integers to holding registers on device"""
  140. request = bytearray(address.to_bytes(2, 'big'))
  141. request += (len(values) // 2).to_bytes(2, 'big')
  142. request += len(values).to_bytes(1, 'big')
  143. request += values
  144. response = self.mb_func(16, bytes(request), 4)
  145. if request[:4] != response:
  146. raise MBError('Incorrect response payload')
  147. #
  148. def write_holding_register(self, address: int, value: int):
  149. """Write 16-bit integer to register on device"""
  150. request = address.to_bytes(2, 'big') + value.to_bytes(2, 'big')
  151. response = self.mb_func(6, request, len(request))
  152. if request != response:
  153. raise MBError('Incorrect response payload')
  154. # 0x14
  155. def read_file_record(self, rec_type: int, channel_number: int, rec_index: int, rec_num: int):
  156. """Read file record
  157. rec_type - код записи, 0x06 - архив, 0x07 - журнал
  158. channel_number - номер канала архива (0 если это журнал)
  159. rec_index - порядковый номер (индекс) записи
  160. rec_num - количество записей
  161. """
  162. request = bytearray((self.address, 0x14, 7, rec_type))
  163. request += rec_index.to_bytes(2, 'big')
  164. request += channel_number.to_bytes(2, 'big')
  165. request += rec_num.to_bytes(2, 'big')
  166. return self.raw_communicate(request + self._crc(request))
  167. def write_uint32(self, address: int, value: int):
  168. """Write 32-bit integer to holding register"""
  169. self.write_holding_registers_raw(address, struct.pack('>I', value))
  170. def write_uint64(self, address: int, value: int):
  171. """Write 64-bit integer to holding register"""
  172. self.write_holding_registers_raw(address, struct.pack('>Q', value))
  173. def test_send(self, data: bytes):
  174. while True:
  175. self.serial.write(data)
  176. time.sleep(1)
  177. def send_recv(self, data, number):
  178. self.serial.write(data)
  179. # while True:
  180. # self.serial.write(data)
  181. # recv = self.serial.read_all()
  182. # print(recv)
  183. # print(recv.decode('utf_8'))
  184. # time.sleep(1)
  185. def main():
  186. ADDR = 1
  187. colorama.init()
  188. dev = Modbus('COM22', 115200, ADDR)
  189. # print(dev.read_holding_registers(0x0100, 1))
  190. # print(dev.read_uint32_holding(0x0100))
  191. # time.sleep(1)
  192. if __name__ == '__main__':
  193. main()