|
@@ -1,6 +1,7 @@
|
|
|
import time
|
|
|
from serial import Serial
|
|
|
import colorama
|
|
|
+import struct
|
|
|
from colorama import Fore, Style
|
|
|
from typing import Sequence, Union
|
|
|
from binascii import b2a_hex
|
|
@@ -50,7 +51,7 @@ class Modbus(ModbusMixin):
|
|
|
# MB_CRC_TABLE: Sequence[int] = DEFAULT_MB_CRC_TABLE
|
|
|
MB_TIMEOUT: float = 0.05
|
|
|
MB_CRC_TABLE: Sequence[int] = DEFAULT_MB_CRC_TABLE
|
|
|
- MB_DEBUG: bool = True
|
|
|
+ MB_DEBUG: bool = False
|
|
|
MB_TRIES: int = 3
|
|
|
|
|
|
def __init__(self, tty: str, brate: int, address: int):
|
|
@@ -140,6 +141,28 @@ class Modbus(ModbusMixin):
|
|
|
def read_holding_registers(self, address: int, count: int) -> Sequence[int]:
|
|
|
return self._read_registers(3, address, count)
|
|
|
|
|
|
+ def read_holding_registers_raw(self, address: int, count: int) -> bytes:
|
|
|
+ return self._read_registers(3, address, count, True)
|
|
|
+
|
|
|
+ def read_uint32_holding(self, address: int) -> int:
|
|
|
+ """Read 32-bit integer from holding registers"""
|
|
|
+ return struct.unpack('>I', self.read_holding_registers_raw(address, 2))[0]
|
|
|
+
|
|
|
+ # 0x10
|
|
|
+ def write_holding_registers_raw(self, address:int, values: bytes):
|
|
|
+ """Write 16-bit integers to holding registers on device"""
|
|
|
+ request = bytearray(address.to_bytes(2, 'big'))
|
|
|
+ request += (len(values) // 2).to_bytes(2, 'big')
|
|
|
+ request += len(values).to_bytes(1, 'big')
|
|
|
+ request += values
|
|
|
+ response = self.mb_func(16, bytes(request), 4)
|
|
|
+ if request[:4] != response:
|
|
|
+ raise MBError('Incorrect response payload')
|
|
|
+
|
|
|
+ def write_uint32(self, address: int, value: int):
|
|
|
+ """Write 32-bit integer to holding register"""
|
|
|
+ self.write_holding_registers_raw(address, struct.pack('>I', value))
|
|
|
+
|
|
|
def test_send(self, data: bytes):
|
|
|
while True:
|
|
|
self.serial.write(data)
|
|
@@ -156,42 +179,17 @@ class Modbus(ModbusMixin):
|
|
|
# time.sleep(1)
|
|
|
|
|
|
def main():
|
|
|
+ ADDR = 1
|
|
|
colorama.init()
|
|
|
- # st = 'Hello world'
|
|
|
- # st.encode('utf-8')
|
|
|
-
|
|
|
- # print(st.encode('utf-8'))
|
|
|
- dev = Modbus('COM22', 115200, 1)
|
|
|
- # to_send = b'\x01\x03\x01\x00\x00\x01'
|
|
|
- # dev.communicate(to_send, 7)
|
|
|
-
|
|
|
- # Пример чтения одного регистра по адресу 0x0100
|
|
|
- while True:
|
|
|
- print(dev.read_holding_registers(0x0100, 1))
|
|
|
- time.sleep(1)
|
|
|
- # dev._read_registers(0x03, 0x0100, 1, False)
|
|
|
- # print(dev._read_registers(0x03, 0x0100, 1, True))
|
|
|
+ dev = Modbus('COM22', 115200, ADDR)
|
|
|
|
|
|
-
|
|
|
- # dev.communicate()
|
|
|
- # dev.test_send(bytes('hello world\r\n', encoding="utf_8"))
|
|
|
- # tx_data = 'hello world\r\n'
|
|
|
- # tx_data = 'abc'
|
|
|
- # dev.send_recv(bytes(tx_data, encoding="utf_8"), len(tx_data))
|
|
|
-
|
|
|
- # Modbus.test()
|
|
|
-
|
|
|
- # print(i.to_bytes(2, 'big'))
|
|
|
- # print(Modbus.crc(i.to_bytes(1, 'big')))
|
|
|
- # print(Modbus.crc(b'\x00\x01'))
|
|
|
- # crc = Modbus.crc(b'\x00\x01')
|
|
|
- # dev.raw_communicate(b'\x00\x01')
|
|
|
- # print(ModbusMixin.print_hex(crc))
|
|
|
- # print(Modbus.crc(b'\xd0\x00'))
|
|
|
+
|
|
|
+ # print(dev.read_holding_registers(0x0100, 1))
|
|
|
+ # print(dev.read_uint32_holding(0x0100))
|
|
|
+ # time.sleep(1)
|
|
|
+
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
main()
|
|
|
- # a = 11
|
|
|
- # print(a.to_bytes(2, 'big'))
|
|
|
|
|
|
|