|
@@ -2,7 +2,8 @@ import time
|
|
|
from serial import Serial
|
|
|
import colorama
|
|
|
from colorama import Fore, Style
|
|
|
-from typing import Sequence
|
|
|
+from typing import Sequence, Union
|
|
|
+from binascii import b2a_hex
|
|
|
|
|
|
DEFAULT_MB_CRC_TABLE = (
|
|
|
00000, 49345, 49537, 320, 49921, 960, 640, 49729, 50689, 1728, 1920,
|
|
@@ -30,6 +31,14 @@ DEFAULT_MB_CRC_TABLE = (
|
|
|
34177, 17728, 34561, 18368, 18048, 34369, 33281, 17088, 17280, 33601, 16640,
|
|
|
33217, 32897, 16448)
|
|
|
|
|
|
+class NoResponseError(IOError):
|
|
|
+ pass
|
|
|
+
|
|
|
+class ChecksumError(IOError):
|
|
|
+ pass
|
|
|
+
|
|
|
+class MBError(IOError):
|
|
|
+ pass
|
|
|
|
|
|
class ModbusMixin():
|
|
|
def print_hex(self, text: str, data: bytes):
|
|
@@ -40,10 +49,11 @@ class Modbus(ModbusMixin):
|
|
|
|
|
|
# MB_CRC_TABLE: Sequence[int] = DEFAULT_MB_CRC_TABLE
|
|
|
MB_TIMEOUT: float = 0.05
|
|
|
- MB_CRC_TABLE: [int] = DEFAULT_MB_CRC_TABLE
|
|
|
+ MB_CRC_TABLE: Sequence[int] = DEFAULT_MB_CRC_TABLE
|
|
|
MB_DEBUG: bool = True
|
|
|
+ MB_TRIES: int = 3
|
|
|
|
|
|
- def __init__(self, tty: str, brate: int, address: str):
|
|
|
+ def __init__(self, tty: str, brate: int, address: int):
|
|
|
self.serial = Serial(port=tty, baudrate=brate, timeout=0.05, parity='N', xonxoff=False)
|
|
|
self.address = address
|
|
|
|
|
@@ -52,7 +62,7 @@ class Modbus(ModbusMixin):
|
|
|
print(type(cls.MB_CRC_TABLE))
|
|
|
|
|
|
@classmethod
|
|
|
- def crc(cls, data: bytes) -> bytes:
|
|
|
+ def _crc(cls, data: bytes) -> bytes:
|
|
|
crc = 0xFFFF
|
|
|
crc_table = cls.MB_CRC_TABLE
|
|
|
for char in data:
|
|
@@ -60,23 +70,75 @@ class Modbus(ModbusMixin):
|
|
|
return crc.to_bytes(2, 'little')
|
|
|
|
|
|
def raw_communicate(self, data: bytes, predicted_length:int = -1) -> bytes:
|
|
|
+ """Send request and return it back with checksum"""
|
|
|
if self.MB_DEBUG:
|
|
|
self.print_hex('Request:', data)
|
|
|
self.serial.write(data)
|
|
|
- response_bytes = list()
|
|
|
+ response_bytes = bytearray()
|
|
|
start_time = time.time()
|
|
|
while True:
|
|
|
b = self.serial.read(1)
|
|
|
if len(b):
|
|
|
- response_bytes.append(b)
|
|
|
+ new_byte = bytearray(b)
|
|
|
+ response_bytes.extend(new_byte)
|
|
|
elif time.time() - start_time > self.MB_TIMEOUT:
|
|
|
break
|
|
|
if len(response_bytes) == predicted_length:
|
|
|
break
|
|
|
if self.MB_DEBUG:
|
|
|
self.print_hex('Responce:', response_bytes)
|
|
|
-
|
|
|
+ return response_bytes
|
|
|
|
|
|
+ def communicate(self, request: bytes, predicted_length: int = -1) -> bytes:
|
|
|
+ """Send request and return rewponse after checksum check"""
|
|
|
+ pl = predicted_length + 2 if predicted_length != -1 else -1
|
|
|
+ response = self.raw_communicate(request + self._crc(request), pl)
|
|
|
+ if len(response) == 0:
|
|
|
+ raise NoResponseError('No response frome module')
|
|
|
+ crc_received = response[-2:]
|
|
|
+ crc_calculated = self._crc(response[:-2])
|
|
|
+ if crc_received != crc_calculated:
|
|
|
+ raise ChecksumError("CRC check dailed (got: 0x{}, calculated: 0x{})".format(
|
|
|
+ b2a_hex(crc_received).decode().upper(), b2a_hex(crc_calculated).decode().upper()))
|
|
|
+ return response
|
|
|
+
|
|
|
+ def mb_func(self, code: int, req: bytes, predicted_response_length: int = -1):
|
|
|
+ """Call typical MB function with address and function code check,
|
|
|
+ return just response, no header and checksum included"""
|
|
|
+ error = None
|
|
|
+ for i in range(self.MB_TRIES):
|
|
|
+ try:
|
|
|
+ pl = predicted_response_length + 2 if predicted_response_length != -1 else -1
|
|
|
+ response = self.communicate(b''.join((self.address.to_bytes(1, 'big'),
|
|
|
+ code.to_bytes(1, 'big'), req)), pl)
|
|
|
+ if response[0] != self.address:
|
|
|
+ raise MBError("Modbus address mismatch in request and response ({}/{})".format(
|
|
|
+ self.address, response[0]))
|
|
|
+ elif response[1] != code:
|
|
|
+ raise MBError("Modbus function opcode mismatch in request and response ({}/{})".format(
|
|
|
+ code, response[1]))
|
|
|
+ return response[2:-2]
|
|
|
+ except (MBError, ChecksumError, NoResponseError) as e:
|
|
|
+ if self.MB_DEBUG:
|
|
|
+ print(e)
|
|
|
+ error = e
|
|
|
+ time.sleep(0.5)
|
|
|
+ raise error
|
|
|
+
|
|
|
+ def _read_registers(self, opcode: int, address: int, count: int, raw: bool = False) -> Union[Sequence[int], bytes]:
|
|
|
+ """Read values of inpurt/holding registers from device"""
|
|
|
+ response = self.mb_func(opcode, address.to_bytes(2, 'big') + count.to_bytes(2, 'big'), 1 + 2*count)
|
|
|
+ try:
|
|
|
+ if raw:
|
|
|
+ return response[1:]
|
|
|
+ inp_count = response[0] // 2
|
|
|
+ return tuple(int.from_bytes(response[i:i + 2], 'big') for i in range(1, inp_count*2, 2))
|
|
|
+ except KeyError:
|
|
|
+ raise MBError("Incorrect response payload")
|
|
|
+
|
|
|
+ # 0x03
|
|
|
+ def read_holding_registers(self, address: int, count: int) -> Sequence[int]:
|
|
|
+ return self._read_registers(3, address, count)
|
|
|
|
|
|
def test_send(self, data: bytes):
|
|
|
while True:
|
|
@@ -95,11 +157,21 @@ class Modbus(ModbusMixin):
|
|
|
|
|
|
def main():
|
|
|
colorama.init()
|
|
|
- st = 'Hello world'
|
|
|
+ # st = 'Hello world'
|
|
|
# st.encode('utf-8')
|
|
|
|
|
|
- print(st.encode('utf-8'))
|
|
|
- # dev = Modbus('COM22', 115200, 1)
|
|
|
+ # print(st.encode('utf-8'))
|
|
|
+ dev = Modbus('COM22', 115200, 1)
|
|
|
+ to_send = b'\x01\x03\x01\x00\x00\x01'
|
|
|
+ dev.communicate(to_send, 7)
|
|
|
+
|
|
|
+ # Пример чтения одного регистра по адресу 0x0100
|
|
|
+ print(dev.read_holding_registers(0x0100, 1))
|
|
|
+ # dev._read_registers(0x03, 0x0100, 1, False)
|
|
|
+ # print(dev._read_registers(0x03, 0x0100, 1, True))
|
|
|
+
|
|
|
+
|
|
|
+ # dev.communicate()
|
|
|
# dev.test_send(bytes('hello world\r\n', encoding="utf_8"))
|
|
|
# tx_data = 'hello world\r\n'
|
|
|
# tx_data = 'abc'
|
|
@@ -117,4 +189,7 @@ def main():
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
main()
|
|
|
+ # a = 11
|
|
|
+ # print(a.to_bytes(2, 'big'))
|
|
|
+
|
|
|
|