modbus.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. import time
  2. from serial import Serial
  3. import colorama
  4. import struct
  5. from colorama import Fore, Style
  6. from typing import Sequence, Union
  7. from binascii import b2a_hex
  8. DEFAULT_MB_CRC_TABLE = (
  9. 00000, 49345, 49537, 320, 49921, 960, 640, 49729, 50689, 1728, 1920,
  10. 51009, 1280, 50625, 50305, 1088, 52225, 3264, 3456, 52545, 3840, 53185,
  11. 52865, 3648, 2560, 51905, 52097, 2880, 51457, 2496, 2176, 51265, 55297,
  12. 6_336, 6528, 55617, 6912, 56257, 55937, 6720, 7680, 57025, 57217, 8000,
  13. 56577, 7616, 7296, 56385, 5120, 54465, 54657, 5440, 55041, 6080, 5760,
  14. 54849, 53761, 4800, 4992, 54081, 4352, 53697, 53377, 4160, 61441, 12480,
  15. 12672, 61761, 13056, 62401, 62081, 12864, 13824, 63169, 63361, 14144, 62721,
  16. 13760, 13440, 62529, 15360, 64705, 64897, 15680, 65281, 16320, 16000, 65089,
  17. 64001, 15040, 15232, 64321, 14592, 63937, 63617, 14400, 10240, 59585, 59777,
  18. 10560, 60161, 11200, 10880, 59969, 60929, 11968, 12160, 61249, 11520, 60865,
  19. 60545, 11328, 58369, 9408, 9600, 58689, 9984, 59329, 59009, 9792, 8704,
  20. 58049, 58241, 9024, 57601, 8640, 8320, 57409, 40961, 24768, 24960, 41281,
  21. 25344, 41921, 41601, 25152, 26112, 42689, 42881, 26432, 42241, 26048, 25728,
  22. 42049, 27648, 44225, 44417, 27968, 44801, 28608, 28288, 44609, 43521, 27328,
  23. 27520, 43841, 26880, 43457, 43137, 26688, 30720, 47297, 47489, 31040, 47873,
  24. 31680, 31360, 47681, 48641, 32448, 32640, 48961, 32000, 48577, 48257, 31808,
  25. 46081, 29888, 30080, 46401, 30464, 47041, 46721, 30272, 29184, 45761, 45953,
  26. 29504, 45313, 29120, 28800, 45121, 20480, 37057, 37249, 20800, 37633, 21440,
  27. 21120, 37441, 38401, 22208, 22400, 38721, 21760, 38337, 38017, 21568, 39937,
  28. 23744, 23936, 40257, 24320, 40897, 40577, 24128, 23040, 39617, 39809, 23360,
  29. 39169, 22976, 22656, 38977, 34817, 18624, 18816, 35137, 19200, 35777, 35457,
  30. 19008, 19968, 36545, 36737, 20288, 36097, 19904, 19584, 35905, 17408, 33985,
  31. 34177, 17728, 34561, 18368, 18048, 34369, 33281, 17088, 17280, 33601, 16640,
  32. 33217, 32897, 16448)
  33. class NoResponseError(IOError):
  34. pass
  35. class ChecksumError(IOError):
  36. pass
  37. class MBError(IOError):
  38. pass
  39. class ModbusMixin:
  40. def print_hex(self, text: str, data: bytes):
  41. print(text, *tuple(map(lambda x: '0x{:02X}'.format(x), (i for i in data))))
  42. class Modbus(ModbusMixin):
  43. # MB_CRC_TABLE: Sequence[int] = DEFAULT_MB_CRC_TABLE
  44. REF_TYPE = 6
  45. MB_TIMEOUT: float = 0.05 # 0.05
  46. MB_CRC_TABLE: Sequence[int] = DEFAULT_MB_CRC_TABLE
  47. MB_DEBUG: bool = False
  48. MB_TRIES: int = 3
  49. # def __init__(self, tty: str, brate: int, address: int):
  50. def __init__(self, serial: Serial, address: int):
  51. self.serial = serial
  52. self.address = address
  53. @classmethod
  54. def test(cls):
  55. print(type(cls.MB_CRC_TABLE))
  56. @classmethod
  57. def _crc(cls, data: bytes) -> bytes:
  58. crc = 0xFFFF
  59. crc_table = cls.MB_CRC_TABLE
  60. for char in data:
  61. crc = (crc >> 8) ^ crc_table[(crc ^ char) & 0xFF]
  62. return crc.to_bytes(2, 'little')
  63. def raw_communicate(self, data: bytes, predicted_length:int = -1) -> bytes:
  64. """Send request and return it back with checksum"""
  65. # print('start')
  66. time.sleep(0.001)
  67. if self.MB_DEBUG:
  68. self.print_hex('Request:', data)
  69. self.serial.write(data)
  70. response_bytes = bytearray()
  71. start_time = time.time()
  72. while True:
  73. b = self.serial.read(1)
  74. if len(b):
  75. new_byte = bytearray(b)
  76. response_bytes.extend(new_byte)
  77. elif time.time() - start_time > self.MB_TIMEOUT:
  78. break
  79. if len(response_bytes) == predicted_length:
  80. # print('!')
  81. break
  82. if self.MB_DEBUG:
  83. self.print_hex('Responce:', response_bytes)
  84. return response_bytes
  85. def communicate(self, request: bytes, predicted_length: int = -1) -> bytes:
  86. """Send request and return rewponse after checksum check"""
  87. pl = predicted_length + 2 if predicted_length != -1 else -1
  88. response = self.raw_communicate(request + self._crc(request), pl)
  89. if len(response) == 0:
  90. raise NoResponseError('No response frome module')
  91. crc_received = response[-2:]
  92. crc_calculated = self._crc(response[:-2])
  93. if crc_received != crc_calculated:
  94. raise ChecksumError("CRC check failed (got: 0x{}, calculated: 0x{})".format(
  95. b2a_hex(crc_received).decode().upper(), b2a_hex(crc_calculated).decode().upper()))
  96. return response
  97. def mb_func(self, code: int, req: bytes, predicted_response_length: int = -1):
  98. """Call typical MB function with address and function code check,
  99. return just response, no header and checksum included"""
  100. error = None
  101. for i in range(self.MB_TRIES):
  102. try:
  103. pl = predicted_response_length + 2 if predicted_response_length != -1 else -1
  104. response = self.communicate(b''.join((self.address.to_bytes(1, 'big'),
  105. code.to_bytes(1, 'big'), req)), pl)
  106. if response[0] != self.address:
  107. raise MBError("Modbus address mismatch in request and response ({}/{})".format(
  108. self.address, response[0]))
  109. elif response[1] != code:
  110. raise MBError("Modbus function opcode mismatch in request and response ({}/{})".format(
  111. code, response[1]))
  112. return response[2:-2]
  113. except (MBError, ChecksumError, NoResponseError) as e:
  114. if self.MB_DEBUG:
  115. print(e)
  116. error = e
  117. time.sleep(0.5)
  118. raise error
  119. def _read_registers(self, opcode: int, address: int, count: int, raw: bool = False) -> Union[Sequence[int], bytes]:
  120. """Read values of inpurt/holding registers from device"""
  121. response = self.mb_func(opcode, address.to_bytes(2, 'big') + count.to_bytes(2, 'big'), 1 + 2*count)
  122. try:
  123. if raw:
  124. return response[1:]
  125. inp_count = response[0] // 2
  126. return tuple(int.from_bytes(response[i:i + 2], 'big') for i in range(1, inp_count*2, 2))
  127. except KeyError:
  128. raise MBError("Incorrect response payload")
  129. # 0x03
  130. def read_holding_registers(self, address: int, count: int) -> Sequence[int]:
  131. return self._read_registers(3, address, count)
  132. def read_holding_registers_raw(self, address: int, count: int) -> bytes:
  133. return self._read_registers(3, address, count, True)
  134. def read_uint32_holding(self, address: int) -> int:
  135. """Read 32-bit integer from holding registers"""
  136. return struct.unpack('>I', self.read_holding_registers_raw(address, 2))[0]
  137. def read_uint64_holding(self, address: int) -> int:
  138. """Read 64-bit integer from holding registers"""
  139. return struct.unpack('>Q', self.read_holding_registers_raw(address, 4))[0]
  140. def read_float_holding(self, address: int) -> float:
  141. """Read float integer from two holding registers"""
  142. return struct.unpack('>f', self.read_holding_registers_raw(address, 2))[0]
  143. # 0x14
  144. def read_file_record(self, rec_type: int, channel_number: int, rec_index: int, rec_num: int):
  145. """Read file record
  146. rec_type - код записи, 0x06 - архив, 0x07 - журнал
  147. channel_number - номер канала архива (0 если это журнал)
  148. rec_index - порядковый номер (индекс) записи
  149. rec_num - количество записей
  150. """
  151. request = bytearray((self.address, 0x14, 7, rec_type))
  152. request += rec_index.to_bytes(2, 'big')
  153. request += channel_number.to_bytes(2, 'big')
  154. request += rec_num.to_bytes(2, 'big')
  155. return self.raw_communicate(request + self._crc(request))
  156. # 0x10
  157. def write_holding_registers_raw(self, address:int, values: bytes):
  158. """Write 16-bit integers to holding registers on device"""
  159. request = bytearray(address.to_bytes(2, 'big'))
  160. request += (len(values) // 2).to_bytes(2, 'big')
  161. request += len(values).to_bytes(1, 'big')
  162. request += values
  163. response = self.mb_func(16, bytes(request), 4)
  164. if request[:4] != response:
  165. raise MBError('Incorrect response payload')
  166. #
  167. def write_holding_register(self, address: int, value: int):
  168. """Write 16-bit integer to register on device"""
  169. request = address.to_bytes(2, 'big') + value.to_bytes(2, 'big')
  170. response = self.mb_func(6, request, len(request))
  171. if request != response:
  172. raise MBError('Incorrect response payload')
  173. def write_uint32(self, address: int, value: int):
  174. """Write 32-bit integer to holding register"""
  175. self.write_holding_registers_raw(address, struct.pack('>I', value))
  176. def write_uint64(self, address: int, value: int):
  177. """Write 64-bit integer to holding register"""
  178. self.write_holding_registers_raw(address, struct.pack('>Q', value))
  179. def write_float(self, address: int, value: float):
  180. """Write float to two holding registers"""
  181. self.write_holding_registers_raw(address, struct.pack('>f', value))
  182. def test_send(self, data: bytes):
  183. while True:
  184. self.serial.write(data)
  185. time.sleep(1)
  186. def send_recv(self, data, number):
  187. self.serial.write(data)
  188. # while True:
  189. # self.serial.write(data)
  190. # recv = self.serial.read_all()
  191. # print(recv)
  192. # print(recv.decode('utf_8'))
  193. # time.sleep(1)
  194. def main():
  195. ADDR = 1
  196. colorama.init()
  197. dev = Modbus('COM22', 115200, ADDR)
  198. # print(dev.read_holding_registers(0x0100, 1))
  199. # print(dev.read_uint32_holding(0x0100))
  200. # time.sleep(1)
  201. if __name__ == '__main__':
  202. main()