modbus.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. import time
  2. from serial import Serial
  3. import colorama
  4. import struct
  5. from colorama import Fore, Style
  6. from typing import Sequence, Union
  7. from binascii import b2a_hex
  8. DEFAULT_MB_CRC_TABLE = (
  9. 00000, 49345, 49537, 320, 49921, 960, 640, 49729, 50689, 1728, 1920,
  10. 51009, 1280, 50625, 50305, 1088, 52225, 3264, 3456, 52545, 3840, 53185,
  11. 52865, 3648, 2560, 51905, 52097, 2880, 51457, 2496, 2176, 51265, 55297,
  12. 6_336, 6528, 55617, 6912, 56257, 55937, 6720, 7680, 57025, 57217, 8000,
  13. 56577, 7616, 7296, 56385, 5120, 54465, 54657, 5440, 55041, 6080, 5760,
  14. 54849, 53761, 4800, 4992, 54081, 4352, 53697, 53377, 4160, 61441, 12480,
  15. 12672, 61761, 13056, 62401, 62081, 12864, 13824, 63169, 63361, 14144, 62721,
  16. 13760, 13440, 62529, 15360, 64705, 64897, 15680, 65281, 16320, 16000, 65089,
  17. 64001, 15040, 15232, 64321, 14592, 63937, 63617, 14400, 10240, 59585, 59777,
  18. 10560, 60161, 11200, 10880, 59969, 60929, 11968, 12160, 61249, 11520, 60865,
  19. 60545, 11328, 58369, 9408, 9600, 58689, 9984, 59329, 59009, 9792, 8704,
  20. 58049, 58241, 9024, 57601, 8640, 8320, 57409, 40961, 24768, 24960, 41281,
  21. 25344, 41921, 41601, 25152, 26112, 42689, 42881, 26432, 42241, 26048, 25728,
  22. 42049, 27648, 44225, 44417, 27968, 44801, 28608, 28288, 44609, 43521, 27328,
  23. 27520, 43841, 26880, 43457, 43137, 26688, 30720, 47297, 47489, 31040, 47873,
  24. 31680, 31360, 47681, 48641, 32448, 32640, 48961, 32000, 48577, 48257, 31808,
  25. 46081, 29888, 30080, 46401, 30464, 47041, 46721, 30272, 29184, 45761, 45953,
  26. 29504, 45313, 29120, 28800, 45121, 20480, 37057, 37249, 20800, 37633, 21440,
  27. 21120, 37441, 38401, 22208, 22400, 38721, 21760, 38337, 38017, 21568, 39937,
  28. 23744, 23936, 40257, 24320, 40897, 40577, 24128, 23040, 39617, 39809, 23360,
  29. 39169, 22976, 22656, 38977, 34817, 18624, 18816, 35137, 19200, 35777, 35457,
  30. 19008, 19968, 36545, 36737, 20288, 36097, 19904, 19584, 35905, 17408, 33985,
  31. 34177, 17728, 34561, 18368, 18048, 34369, 33281, 17088, 17280, 33601, 16640,
  32. 33217, 32897, 16448)
  33. class NoResponseError(IOError):
  34. pass
  35. class ChecksumError(IOError):
  36. pass
  37. class MBError(IOError):
  38. pass
  39. class ModbusMixin():
  40. def print_hex(self, text: str, data: bytes):
  41. print(text, *tuple(map(lambda x: '0x{:02X}'.format(x), (i for i in data))))
  42. class Modbus(ModbusMixin):
  43. # MB_CRC_TABLE: Sequence[int] = DEFAULT_MB_CRC_TABLE
  44. MB_TIMEOUT: float = 0.05
  45. MB_CRC_TABLE: Sequence[int] = DEFAULT_MB_CRC_TABLE
  46. MB_DEBUG: bool = False
  47. MB_TRIES: int = 3
  48. def __init__(self, tty: str, brate: int, address: int):
  49. self.serial = Serial(port=tty, baudrate=brate, timeout=0.05, parity='N', xonxoff=False)
  50. self.address = address
  51. @classmethod
  52. def test(cls):
  53. print(type(cls.MB_CRC_TABLE))
  54. @classmethod
  55. def _crc(cls, data: bytes) -> bytes:
  56. crc = 0xFFFF
  57. crc_table = cls.MB_CRC_TABLE
  58. for char in data:
  59. crc = (crc >> 8) ^ crc_table[(crc ^ char) & 0xFF]
  60. return crc.to_bytes(2, 'little')
  61. def raw_communicate(self, data: bytes, predicted_length:int = -1) -> bytes:
  62. """Send request and return it back with checksum"""
  63. if self.MB_DEBUG:
  64. self.print_hex('Request:', data)
  65. self.serial.write(data)
  66. response_bytes = bytearray()
  67. start_time = time.time()
  68. while True:
  69. b = self.serial.read(1)
  70. if len(b):
  71. new_byte = bytearray(b)
  72. response_bytes.extend(new_byte)
  73. elif time.time() - start_time > self.MB_TIMEOUT:
  74. break
  75. if len(response_bytes) == predicted_length:
  76. break
  77. if self.MB_DEBUG:
  78. self.print_hex('Responce:', response_bytes)
  79. return response_bytes
  80. def communicate(self, request: bytes, predicted_length: int = -1) -> bytes:
  81. """Send request and return rewponse after checksum check"""
  82. pl = predicted_length + 2 if predicted_length != -1 else -1
  83. response = self.raw_communicate(request + self._crc(request), pl)
  84. if len(response) == 0:
  85. raise NoResponseError('No response frome module')
  86. crc_received = response[-2:]
  87. crc_calculated = self._crc(response[:-2])
  88. if crc_received != crc_calculated:
  89. raise ChecksumError("CRC check dailed (got: 0x{}, calculated: 0x{})".format(
  90. b2a_hex(crc_received).decode().upper(), b2a_hex(crc_calculated).decode().upper()))
  91. return response
  92. def mb_func(self, code: int, req: bytes, predicted_response_length: int = -1):
  93. """Call typical MB function with address and function code check,
  94. return just response, no header and checksum included"""
  95. error = None
  96. for i in range(self.MB_TRIES):
  97. try:
  98. pl = predicted_response_length + 2 if predicted_response_length != -1 else -1
  99. response = self.communicate(b''.join((self.address.to_bytes(1, 'big'),
  100. code.to_bytes(1, 'big'), req)), pl)
  101. if response[0] != self.address:
  102. raise MBError("Modbus address mismatch in request and response ({}/{})".format(
  103. self.address, response[0]))
  104. elif response[1] != code:
  105. raise MBError("Modbus function opcode mismatch in request and response ({}/{})".format(
  106. code, response[1]))
  107. return response[2:-2]
  108. except (MBError, ChecksumError, NoResponseError) as e:
  109. if self.MB_DEBUG:
  110. print(e)
  111. error = e
  112. time.sleep(0.5)
  113. raise error
  114. def _read_registers(self, opcode: int, address: int, count: int, raw: bool = False) -> Union[Sequence[int], bytes]:
  115. """Read values of inpurt/holding registers from device"""
  116. response = self.mb_func(opcode, address.to_bytes(2, 'big') + count.to_bytes(2, 'big'), 1 + 2*count)
  117. try:
  118. if raw:
  119. return response[1:]
  120. inp_count = response[0] // 2
  121. return tuple(int.from_bytes(response[i:i + 2], 'big') for i in range(1, inp_count*2, 2))
  122. except KeyError:
  123. raise MBError("Incorrect response payload")
  124. # 0x03
  125. def read_holding_registers(self, address: int, count: int) -> Sequence[int]:
  126. return self._read_registers(3, address, count)
  127. def read_holding_registers_raw(self, address: int, count: int) -> bytes:
  128. return self._read_registers(3, address, count, True)
  129. def read_uint32_holding(self, address: int) -> int:
  130. """Read 32-bit integer from holding registers"""
  131. return struct.unpack('>I', self.read_holding_registers_raw(address, 2))[0]
  132. # 0x10
  133. def write_holding_registers_raw(self, address:int, values: bytes):
  134. """Write 16-bit integers to holding registers on device"""
  135. request = bytearray(address.to_bytes(2, 'big'))
  136. request += (len(values) // 2).to_bytes(2, 'big')
  137. request += len(values).to_bytes(1, 'big')
  138. request += values
  139. response = self.mb_func(16, bytes(request), 4)
  140. if request[:4] != response:
  141. raise MBError('Incorrect response payload')
  142. #
  143. def write_holding_register(self, address: int, value: int):
  144. """Write 16-bit integer to register on device"""
  145. request = address.to_bytes(2, 'big') + value.to_bytes(2, 'big')
  146. response = self.mb_func(6, request, len(request))
  147. if request != response:
  148. raise MBError('Incorrect response payload')
  149. def write_uint32(self, address: int, value: int):
  150. """Write 32-bit integer to holding register"""
  151. self.write_holding_registers_raw(address, struct.pack('>I', value))
  152. def test_send(self, data: bytes):
  153. while True:
  154. self.serial.write(data)
  155. time.sleep(1)
  156. def send_recv(self, data, number):
  157. self.serial.write(data)
  158. # while True:
  159. # self.serial.write(data)
  160. # recv = self.serial.read_all()
  161. # print(recv)
  162. # print(recv.decode('utf_8'))
  163. # time.sleep(1)
  164. def main():
  165. ADDR = 1
  166. colorama.init()
  167. dev = Modbus('COM22', 115200, ADDR)
  168. # print(dev.read_holding_registers(0x0100, 1))
  169. # print(dev.read_uint32_holding(0x0100))
  170. # time.sleep(1)
  171. if __name__ == '__main__':
  172. main()