| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461 | from io_module import IO_Module, SysUtilsfrom modbus import Modbus, MBError, NoResponseErrorfrom log_reader import DigitalLogReaderimport coloramafrom colorama import Forefrom time import sleepimport timefrom datetime import datetime, timedelta, timezonefrom mb_registers import DIO_REGS, DioRegfrom serial import Serialclass IO_Digital(IO_Module):    def __init__(self, modbus: Modbus):        self.modbus = modbus        super().__init__(self.modbus)        self.log = DigitalLogReader(self.modbus)      '''Запрос параметров дискретных входов'''    # 0x0100 - Состояние входов (вкл./выкл.)    def get_inputs_state(self) -> str:        data = self.modbus.read_holding_registers(DioReg.IN_STATE.value , 1)        return format(data[0], '08b')    # 0x0101 - текущее состояние входов    def get_inputs_bit(self) -> str:        data = self.modbus.read_holding_registers(DioReg.IN_BITS.value , 1)        return format(data[0], '08b')    # 0x0101 - 0x0110 Счетчики импульсов    def get_inputs_counters(self):        data = []        first_reg = DioReg.IN_CNT.value        for i in range(first_reg, first_reg + 16, 2):            data.append(self.modbus.read_uint32_holding(i))        return data    # 0x0120 - режим работы входов    def get_inputs_mode(self):        data = self.modbus.read_holding_registers(DioReg.IN_MODE.value, 1)        return format(data[0], '08b')    # 0x0122 - нормальное состояние входов    def get_inputs_norm_state(self):        data = self.modbus.read_holding_registers(DioReg.IN_NORM.value, 1)        return format(data[0], '08b')    # 0x0124 - время антидребезга отдельного входа (ms)    def get_debounce_channel(self, input):        data = self.modbus.read_holding_registers(DioReg.IN_DEBOUNCE.value + input - 1, 1)        return data[0]    # 0x0124 - время антидребезга всех входов (ms)    def get_debounce_channels(self):        return self.modbus.read_holding_registers(DioReg.IN_DEBOUNCE.value, 8)    '''Запрос датчиков обрыва нагрузки'''    # 0x0130    def get_loads_bit(self):        data = self.modbus.read_holding_registers(DIO_REGS['load_bits'], 1)        return format(data[0], '08b')    # 0x0131    def get_cred_bit(self):        data = self.modbus.read_holding_registers(DIO_REGS['cred_bits'], 1)        return format(data[0], '08b')        '''Установка параметров дискретных входов'''    # 0x0100 - Состояние входов (вкл./выкл.)    def set_inputs_state(self, val):        self.modbus.write_holding_register(DioReg.IN_STATE.value, val)    # 0x0120 - Режим работы всех входов    def set_inputs_mode(self, val):        self.modbus.write_holding_register(DioReg.IN_MODE.value, val)    # 0x0120 - Режим работы отдельного входа    def set_input_mode(self, input, val):        ret = self.modbus.read_holding_registers(DioReg.IN_MODE.value, 1)        if val == 1:            data = ret[0] | (0b1 << (input - 1))        else:            data = ret[0] & ~(0b1 << (input - 1))        self.set_inputs_mode(data)    def get_load_err(self):        'Состояние датчиков обрыва нагрузки'        data = self.modbus.read_holding_registers(DioReg.LOAD_ERR.value, 1)        return format(data[0], '08b')    def get_outputs_err(self):        'Неисправность реле'        data = self.modbus.read_holding_registers(DioReg.OUT_ERR.value, 1)        return format(data[0], '08b')    '''Запрос параметров дискретных входов'''        def get_outputs_state(self):        'Состояние выходов (вкл./выкл.)'        data = self.modbus.read_holding_registers(DioReg.OUT_STATE.value, 1)        return format(data[0], '08b')    def get_outputs_state_save(self):        'Состояние выходов в безопасном режиме (вкл./выкл.)'        data = self.modbus.read_holding_registers(DioReg.OUT_STATE_SAVE.value, 1)        return format(data[0], '08b')    def get_outputs(self):        'Текущее состояние выходов'        data = self.modbus.read_holding_registers(DioReg.OUT_BITS.value, 1)        return format(data[0], '08b')    def get_outputs_save(self):        'Текущее состояние выходов в безопасном режиме'        data = self.modbus.read_holding_registers(DioReg.OUT_BITS_SAVE.value, 1)        return format(data[0], '08b')    def get_outputs_mode(self):        'Режим работы выходов'        data = self.modbus.read_holding_registers(DioReg.OUT_MODE.value, 1)        return format(data[0], '08b')        def get_outputs_mode_save(self):        'Режим работы выходов в безопасном режиме'        data = self.modbus.read_holding_registers(DioReg.OUT_MODE_SAVE.value, 1)        return format(data[0], '08b')        '''Установка параметров дискретных выходов'''    def set_outputs_state(self, val):        'Состояние выходов вкл./выкл. (битовое поле)'        self.modbus.write_holding_register(DioReg.OUT_STATE.value, val)    def set_output_state(self, output, val):        'Состояние выхода вкл./выкл.'        ret = self.modbus.read_holding_registers(DioReg.OUT_STATE.value, 1)[0]        self.set_outputs_state(SysUtils.set_bit(ret, output - 1, val))    def set_outputs_state_save(self, val):        'Состояние выходов вкл./выкл. в безопасном режиме (битовое поле)'        self.modbus.write_holding_register(DioReg.OUT_STATE_SAVE.value, val)        def set_outputs(self, val):        'Текущее состояние выходов в обычном режиме'        self.modbus.write_holding_register(DioReg.OUT_BITS.value, val)        def set_output(self, output, val):        'Установить значение на конкретном выходе [1..8]'        ret = self.modbus.read_holding_registers(DioReg.OUT_BITS.value, 1)[0]        self.set_outputs(SysUtils.set_bit(ret, output - 1, val))    def set_outputs_save(self, val):        'Текущее состояние выходов в обычном режиме'        self.modbus.write_holding_register(DioReg.OUT_BITS_SAVE.value, val)        def set_output_save(self, output, val):        'Установить значение на конкретном выходе [1..8]'        ret = self.modbus.read_holding_registers(DioReg.OUT_BITS_SAVE.value, 1)        self.set_outputs_save(SysUtils.set_bit(ret, output - 1, val))    def set_outputs_mode(self, val):        'Режим работы выходов (битовое поле)'        self.modbus.write_holding_register(DioReg.OUT_MODE.value, val)    def set_output_mode(self, output, val):        'Установить режим работы конкретного выхода [1..8]'        ret = self.modbus.read_holding_registers(DioReg.OUT_MODE.value, 1)[0]        self.set_outputs_mode(SysUtils.set_bit(ret, output - 1, val))            def set_outputs_mode_save(self, val):        'Режим работы выходов в безопасном режиме (битовое поле)'        self.modbus.write_holding_register(DioReg.OUT_MODE_SAVE.value, val)    def set_output_mode_save(self, output, val):        'Установить режим работы конкретного выхода [1..8] в безопамном режиме'        ret = self.modbus.read_holding_registers(DioReg.OUT_MODE_SAVE.value, 1)[0]        self.set_outputs_mode_save(SysUtils.set_bit(ret, output - 1, val))    def print_inputs(self):        # Состояние входов (вкл./выкл.)        print('Inputs state [bit field]  :', Fore.GREEN + self.get_inputs_mode())        # Значения входов (битовое поле)        print('Inputs values [bit field] :', Fore.GREEN + self.get_inputs_bit())        # Значение счетчиков        data = self.get_inputs_counters()        print('Inputs counters           :', Fore.GREEN + ' | '.join(str(el) for el in data))        # Режим работы входов (битовое поле)        print('Inputs mode [bit field]   :', Fore.GREEN + self.get_inputs_mode())        # Нормальное состояние входов (битовое поле)        print('Inputs norm [bit field]   :', Fore.GREEN + self.get_inputs_norm_state())         # Период антидребезга (ms)        print('Debounce input (ms)       :', Fore.GREEN + ' | '.join(str(el) for el in self.get_debounce_channels()))            def print_loads(self):        # Значения датчиков нагрузки (битовое поле)        print('Loads values [bit field] :', Fore.GREEN + self.get_loads_bit())        # Слово достоверности датчиков нагрузки (битовое поле)        print('Credibility loads [bit field] :', Fore.GREEN + self.get_cred_bit())class IO_DigitalTester:    def __init__(self, dev_tester: IO_Digital, dev_dut: IO_Digital):        self.tester = dev_tester        self.dut = dev_dut    '''Управление DUT'''    # Подать/снять питание на DUT    def dut_switch(self, state: bool):        self.tester.set_output(1, state)    # Установить 1 на нечетных входах DUT    def dut_set_odd_inputs(self):        self.tester.set_output(2, False)    # Установить 1 на четный входах DUT    def dut_set_even_imputs(self):        self.tester.set_output(2, True)    '''Тест входов'''    def test_inputs(self):        self.dut_set_even_imputs()        time.sleep(0.1)        self.dut.print_inputs()        time.sleep(1)        self.dut_set_odd_inputs()        time.sleep(0.1)        self.dut.print_inputs()        time.sleep(1)    '''Тест выходов и датчика обрыва нагрузки'''    def test_load(self):        # Все выходы DUT разомкнуты        print("Все выходы DUT разомкнуты...")        self.dut.set_outputs(0)        self.dut.print_loads()        time.sleep(0.1)        # Замкнуть все выходы DUT (лампочка должна гореть)        print("Все выходы DUT замкнуты...")        self.dut.set_outputs(0b11111111)        time.sleep(0.1)                for i in range(1, 9):            # Разомкнуть i-ый выход DUT            print(f"Разомкнуть выход {i}")            self.dut.set_output(i, 0)            time.sleep(1)            self.dut.print_loads()            self.dut.set_output(i, 1)            time.sleep(1)    def get_load(self):        self.dut.set_outputs(0b11110111)        # self.dut.set_outputs(0)        time.sleep(1)        self.dut.print_loads()def main():    colorama.init(autoreset=True)        serial_port = Serial('COM11', 115200, timeout=0.05, parity='N', xonxoff=False)        modbus_tester = Modbus(serial_port, 1)    modbus_tester.MB_DEBUG = False    # dev_tester = IO_Digital(modbus_tester)    dio = IO_Digital(modbus_tester)    # dio.sys.set_save_mode(0)    # dio.set_outputs_state(0b1111_1111)    # dio.set_inputs_state(0b1111_1111)    # dio.set_inputs_state(0b0000_0000)    for addr in range(1, 7):        modbus_tester.address = addr        dio.sys.set_save_mode(0)        dio.set_outputs_state(0b1111_1111)        dio.set_outputs(0b1111_1111)            '''Тесты отдельного модуля DIO'''    # dio.sys.get_system_vars()    # print(dio.sys.get_save_mode())    # print(dio.sys.get_save_delay())        '''Сохранить информацию о модуле'''    # dio.sys.set_info('This is supe r DIO!')    # dio.print_inputs()    # dio.get_inputs_counters()    # modbus_dut = Modbus(serial_port, 2)    # dev_dut = IO_Digital(modbus_dut)    # tester = IO_DigitalTester(dev_tester, dev_dut)    '''Включить DUT'''    # tester.dut_switch(True)    '''Запросить системные настройки DUT'''    # dio.sys.get_system_vars()    '''Тест входов. Переключение значений на входах DUT'''    # for i in range(10):    #     tester.test_inputs()    '''Тестирование выходов и датчиков обрыва нагрузки'''    # tester.test_load()    # for i in range(100):    #     tester.get_load()    '''Выходы'''    # print(dio.get_outputs())        '''~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~'''    '''             Выходы              '''    '''Выходы вкл/выкл'''    # print(dio.get_outputs_state())    # dio.set_outputs_state(0b1001_0000)    # print(dio.get_outputs_state())        # print(dio.get_outputs_state_save())    # dio.set_outputs_state_save(0b0001_1001)    # print(dio.get_outputs_state_save())    '''Режим работы'''    # print(dio.get_outputs_mode())    # print(dio.get_outputs_mode_save())    # dio.set_outputs_state(0b0000_0001)    # dio.set_outputs_mode(0b0000_0000)    # dio.set_outputs(0b0000_0001)    # print(dio.get_outputs_mode())        '''Текущие значения'''    # print(dio.get_outputs())    # print(dio.get_outputs_save())    # dio.set_outputs_state(0b1000_0000)    # dio.set_output_mode(8, 1)    # dio.set_outputs(0b1000_0000)    # dio.set_outputs_state_save(0b0000_0000)    # dio.set_outputs_save(0b0000_0000)    # dio.set_outputs_state_save(0b1000_0000)    # dio.set_output_mode_save(8, 0)    # dio.set_outputs_save(0b1000_0000)    # print(dio.get_inputs_mode())    # for i in range(100):    #     dio.set_outputs(0b00000001)    #     sleep(1)    #     dio.set_outputs(0b00000000)    #     sleep(1)    '''Системные переменные и параметры'''    # dio.sys.get_system_vars()    # print(dio.sys.get_bat_votage())    '''Безопасный режим'''    # print(dio.sys.get_save_delay())    # print(dio.sys.get_save_mode())    # dio.sys.set_save_mode(0)    # print(dio.sys.get_save_mode())    # dio.sys.set_save_delay(5)    ''' Установить текущее время с учетом часового пояса'''    # print(dio.sys.get_rtc())        # dio.sys.get_system_vars()    '''Лог и архив. Настройки лога.'''    # dev.log.get_log_info()    # dev.log.get_random_entries()    # for i in range(8):    #     dev.log.set_archive_period(5 + i*2, i)        # dev.log.get_log_info()    # dev.log.get_all_archive()    # dev.log.get_all_log()        # dev.log.log_clear()    # dev.log.archive_clear()    '''Лог'''    # dev.log.get_all_log()    '''Архив'''    # dev.log.get_all_archive()    # print(dev.log.get_archive_entry(0, 1))    '''Сохранение настроек'''    # dio.sys.save_sattings()    '''Настройки'''    '''Регистры модуля'''    # dev.print_inputs()    '''Аварии'''    # print(dio.get_load_err())    # print(dio.get_outputs_err())    '''Настройи модуля'''    # for i in range(1, 9):    #     dev.set_input_mode(i, 1)        # print('Inputs mode [bit field]   :', Fore.GREEN + dev.get_inputs_mode())    # dev.set_input_mode(4, 1)    # dev.print_inputs()    '''Обновление'''    # serial_port.timeout = 1    # modbus_tester.MB_DEBUG = True    # dio.updater.update('fw.bin', 'MDIO_88')        if __name__ == '__main__':    main()
 |