modbus.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. import time
  2. from serial import Serial
  3. import colorama
  4. from colorama import Fore, Style
  5. from typing import Sequence, Union
  6. from binascii import b2a_hex
  7. DEFAULT_MB_CRC_TABLE = (
  8. 00000, 49345, 49537, 320, 49921, 960, 640, 49729, 50689, 1728, 1920,
  9. 51009, 1280, 50625, 50305, 1088, 52225, 3264, 3456, 52545, 3840, 53185,
  10. 52865, 3648, 2560, 51905, 52097, 2880, 51457, 2496, 2176, 51265, 55297,
  11. 6_336, 6528, 55617, 6912, 56257, 55937, 6720, 7680, 57025, 57217, 8000,
  12. 56577, 7616, 7296, 56385, 5120, 54465, 54657, 5440, 55041, 6080, 5760,
  13. 54849, 53761, 4800, 4992, 54081, 4352, 53697, 53377, 4160, 61441, 12480,
  14. 12672, 61761, 13056, 62401, 62081, 12864, 13824, 63169, 63361, 14144, 62721,
  15. 13760, 13440, 62529, 15360, 64705, 64897, 15680, 65281, 16320, 16000, 65089,
  16. 64001, 15040, 15232, 64321, 14592, 63937, 63617, 14400, 10240, 59585, 59777,
  17. 10560, 60161, 11200, 10880, 59969, 60929, 11968, 12160, 61249, 11520, 60865,
  18. 60545, 11328, 58369, 9408, 9600, 58689, 9984, 59329, 59009, 9792, 8704,
  19. 58049, 58241, 9024, 57601, 8640, 8320, 57409, 40961, 24768, 24960, 41281,
  20. 25344, 41921, 41601, 25152, 26112, 42689, 42881, 26432, 42241, 26048, 25728,
  21. 42049, 27648, 44225, 44417, 27968, 44801, 28608, 28288, 44609, 43521, 27328,
  22. 27520, 43841, 26880, 43457, 43137, 26688, 30720, 47297, 47489, 31040, 47873,
  23. 31680, 31360, 47681, 48641, 32448, 32640, 48961, 32000, 48577, 48257, 31808,
  24. 46081, 29888, 30080, 46401, 30464, 47041, 46721, 30272, 29184, 45761, 45953,
  25. 29504, 45313, 29120, 28800, 45121, 20480, 37057, 37249, 20800, 37633, 21440,
  26. 21120, 37441, 38401, 22208, 22400, 38721, 21760, 38337, 38017, 21568, 39937,
  27. 23744, 23936, 40257, 24320, 40897, 40577, 24128, 23040, 39617, 39809, 23360,
  28. 39169, 22976, 22656, 38977, 34817, 18624, 18816, 35137, 19200, 35777, 35457,
  29. 19008, 19968, 36545, 36737, 20288, 36097, 19904, 19584, 35905, 17408, 33985,
  30. 34177, 17728, 34561, 18368, 18048, 34369, 33281, 17088, 17280, 33601, 16640,
  31. 33217, 32897, 16448)
  32. class NoResponseError(IOError):
  33. pass
  34. class ChecksumError(IOError):
  35. pass
  36. class MBError(IOError):
  37. pass
  38. class ModbusMixin():
  39. def print_hex(self, text: str, data: bytes):
  40. print(text, *tuple(map(lambda x: '0x{:02X}'.format(x), (i for i in data))))
  41. class Modbus(ModbusMixin):
  42. # MB_CRC_TABLE: Sequence[int] = DEFAULT_MB_CRC_TABLE
  43. MB_TIMEOUT: float = 0.05
  44. MB_CRC_TABLE: Sequence[int] = DEFAULT_MB_CRC_TABLE
  45. MB_DEBUG: bool = True
  46. MB_TRIES: int = 3
  47. def __init__(self, tty: str, brate: int, address: int):
  48. self.serial = Serial(port=tty, baudrate=brate, timeout=0.05, parity='N', xonxoff=False)
  49. self.address = address
  50. @classmethod
  51. def test(cls):
  52. print(type(cls.MB_CRC_TABLE))
  53. @classmethod
  54. def _crc(cls, data: bytes) -> bytes:
  55. crc = 0xFFFF
  56. crc_table = cls.MB_CRC_TABLE
  57. for char in data:
  58. crc = (crc >> 8) ^ crc_table[(crc ^ char) & 0xFF]
  59. return crc.to_bytes(2, 'little')
  60. def raw_communicate(self, data: bytes, predicted_length:int = -1) -> bytes:
  61. """Send request and return it back with checksum"""
  62. if self.MB_DEBUG:
  63. self.print_hex('Request:', data)
  64. self.serial.write(data)
  65. response_bytes = bytearray()
  66. start_time = time.time()
  67. while True:
  68. b = self.serial.read(1)
  69. if len(b):
  70. new_byte = bytearray(b)
  71. response_bytes.extend(new_byte)
  72. elif time.time() - start_time > self.MB_TIMEOUT:
  73. break
  74. if len(response_bytes) == predicted_length:
  75. break
  76. if self.MB_DEBUG:
  77. self.print_hex('Responce:', response_bytes)
  78. return response_bytes
  79. def communicate(self, request: bytes, predicted_length: int = -1) -> bytes:
  80. """Send request and return rewponse after checksum check"""
  81. pl = predicted_length + 2 if predicted_length != -1 else -1
  82. response = self.raw_communicate(request + self._crc(request), pl)
  83. if len(response) == 0:
  84. raise NoResponseError('No response frome module')
  85. crc_received = response[-2:]
  86. crc_calculated = self._crc(response[:-2])
  87. if crc_received != crc_calculated:
  88. raise ChecksumError("CRC check dailed (got: 0x{}, calculated: 0x{})".format(
  89. b2a_hex(crc_received).decode().upper(), b2a_hex(crc_calculated).decode().upper()))
  90. return response
  91. def mb_func(self, code: int, req: bytes, predicted_response_length: int = -1):
  92. """Call typical MB function with address and function code check,
  93. return just response, no header and checksum included"""
  94. error = None
  95. for i in range(self.MB_TRIES):
  96. try:
  97. pl = predicted_response_length + 2 if predicted_response_length != -1 else -1
  98. response = self.communicate(b''.join((self.address.to_bytes(1, 'big'),
  99. code.to_bytes(1, 'big'), req)), pl)
  100. if response[0] != self.address:
  101. raise MBError("Modbus address mismatch in request and response ({}/{})".format(
  102. self.address, response[0]))
  103. elif response[1] != code:
  104. raise MBError("Modbus function opcode mismatch in request and response ({}/{})".format(
  105. code, response[1]))
  106. return response[2:-2]
  107. except (MBError, ChecksumError, NoResponseError) as e:
  108. if self.MB_DEBUG:
  109. print(e)
  110. error = e
  111. time.sleep(0.5)
  112. raise error
  113. def _read_registers(self, opcode: int, address: int, count: int, raw: bool = False) -> Union[Sequence[int], bytes]:
  114. """Read values of inpurt/holding registers from device"""
  115. response = self.mb_func(opcode, address.to_bytes(2, 'big') + count.to_bytes(2, 'big'), 1 + 2*count)
  116. try:
  117. if raw:
  118. return response[1:]
  119. inp_count = response[0] // 2
  120. return tuple(int.from_bytes(response[i:i + 2], 'big') for i in range(1, inp_count*2, 2))
  121. except KeyError:
  122. raise MBError("Incorrect response payload")
  123. # 0x03
  124. def read_holding_registers(self, address: int, count: int) -> Sequence[int]:
  125. return self._read_registers(3, address, count)
  126. def test_send(self, data: bytes):
  127. while True:
  128. self.serial.write(data)
  129. time.sleep(1)
  130. def send_recv(self, data, number):
  131. self.serial.write(data)
  132. # while True:
  133. # self.serial.write(data)
  134. # recv = self.serial.read_all()
  135. # print(recv)
  136. # print(recv.decode('utf_8'))
  137. # time.sleep(1)
  138. def main():
  139. colorama.init()
  140. # st = 'Hello world'
  141. # st.encode('utf-8')
  142. # print(st.encode('utf-8'))
  143. dev = Modbus('COM22', 115200, 1)
  144. # to_send = b'\x01\x03\x01\x00\x00\x01'
  145. # dev.communicate(to_send, 7)
  146. # Пример чтения одного регистра по адресу 0x0100
  147. while True:
  148. print(dev.read_holding_registers(0x0100, 1))
  149. time.sleep(1)
  150. # dev._read_registers(0x03, 0x0100, 1, False)
  151. # print(dev._read_registers(0x03, 0x0100, 1, True))
  152. # dev.communicate()
  153. # dev.test_send(bytes('hello world\r\n', encoding="utf_8"))
  154. # tx_data = 'hello world\r\n'
  155. # tx_data = 'abc'
  156. # dev.send_recv(bytes(tx_data, encoding="utf_8"), len(tx_data))
  157. # Modbus.test()
  158. # print(i.to_bytes(2, 'big'))
  159. # print(Modbus.crc(i.to_bytes(1, 'big')))
  160. # print(Modbus.crc(b'\x00\x01'))
  161. # crc = Modbus.crc(b'\x00\x01')
  162. # dev.raw_communicate(b'\x00\x01')
  163. # print(ModbusMixin.print_hex(crc))
  164. # print(Modbus.crc(b'\xd0\x00'))
  165. if __name__ == '__main__':
  166. main()
  167. # a = 11
  168. # print(a.to_bytes(2, 'big'))