| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430 | from io_module import IO_Modulefrom modbus import Modbusfrom log_reader import AnalogInputLogReaderimport coloramafrom colorama import Forefrom time import sleep, timefrom serial import Serialimport matplotlib.pyplot as plt import matplotlib.animation as animation from mb_registers import SysReg, AiRegimport structai_name = {'AIN_1': 0, 'AIN_2': 1, 'AIN_3': 2, 'AIN_4': 3, 'AIN_5': 4,            'AIN_6': 5, 'AIN_7': 6, 'AIN_8': 7, 'AIN_9': 8, 'AIN_10': 9,            'AIN_11': 10, 'AIN_12': 11, 'V_ISO_CL': 12, 'V_ISO': 13,           'CRNT_LIM_U_BFR_R': 14, 'CRNT_LIM_U_ABFR_R': 15,           'AIN_FIL_1': 0, 'AIN_FIL_2': 1, 'AIN_FIL_3': 2, 'AIN_FIL_4': 3,            'AIN_FIL_5': 4, 'AIN_FIL_6': 5, 'AIN_FIL_7': 6, 'AIN_FIL_8': 7,            'AIN_FIL_9': 8, 'AIN_FIL_10': 9, 'AIN_FIL_11': 10, 'AIN_FIL_12': 11           }class IO_AnalogInput(IO_Module):    GRAPTH_LEN = 100    def __init__(self, modbus: Modbus):        self.modbus = modbus        super().__init__(self.modbus)        self.log = AnalogInputLogReader(self.modbus)        self.fig = plt.figure(1)        self.input = self.fig.add_subplot(1, 1, 1)        self.input.set_title('input_1')        self.x = [0]        self.data = []        self.graph_input = ai_name['AIN_1']        self.graph_input_func = 0    # Чтение параметров    def get_inputs_state(self):        'Значения состояний входов вкл./выкл. (битовое поле)'        data = self.modbus.read_holding_registers(AiReg.IN_STATE.value, 1)        return format(data[0], '012b')    def get_inputs_mode(self):        'Режим измерения входов (0 - напряжение или 1 - ток. (битовое поле))'        data = self.modbus.read_holding_registers(AiReg.IN_MODE.value , 1)        return format(data[0], '012b')        def get_voltage_range(self):        'Диапазон измерения напряжения'        data = self.modbus.read_holding_registers(AiReg.IN_VOL_RANGE.value, 1)        return format(data[0], '012b')        def get_current_range(self):        'Диапазон измерения напряжения'        data = self.modbus.read_holding_registers(AiReg.IN_CUR_RANGE.value, 1)        return format(data[0], '012b')        def get_inputs_alarm(self):        'Аварии аналоговых входов (битовое поле)'        data = self.modbus.read_holding_registers(AiReg.IN_FAILURE.value, 1)        return format(data[0], '012b')    def get_ext_sens_power(self):        'Состояние питания внешних датчиков (вкл./выкл.)'        data = self.modbus.read_holding_registers(AiReg.EXT_SENS_POWER.value, 1)    def get_input_gain(self, channel):        'Коэффициент усиления канала [1..12] (внутренний коэф-т ADC)'        data = self.modbus.read_holding_registers(AiReg.IN_GAINE_FACTOR.value + channel - 1, 1)        return data[0]    def get_k_factor(self, channel):        'Коэффициент пересчета K канала [1..12]'        return self.modbus.read_float_holding(AiReg.IN_K_FACTOR.value + 2*(channel - 1))            def get_b_factor(self, channel):        'Коэффициент пересчета B канала [1..12]'        return self.modbus.read_float_holding(AiReg.IN_B_FACTOR.value + 2*(channel - 1))    def get_raw_input(self, channel):        'Сырые данные по отдельному каналу [1..12]'        return self.modbus.read_holding_registers(AiReg.IN_RAW.value + channel - 1, 1)[0]    def get_raw_inputs(self):        'Сырые данные по всем каналам'        data = self.modbus.read_holding_registers(AiReg.IN_RAW.value, 16)        return data    def get_fil_inputs(self):        'Фильтрованные данные'        start_time = time()        data = self.modbus.read_holding_registers_raw(AiReg.IN_FILTER.value, 24)        return struct.unpack('>ffffffffffff', data)        # print(f'Request time: {time() - start_time}')        # return struct.unpack('>f', self.read_holding_registers_raw(address, 2))[0]        # data = []        # for i in range(12):        #     data.append(self.modbus.read_float_holding(AiReg.IN_FILTER.value + i*2))        # return data    def get_presets_state(self):        'Значения состояний уставок вкл./выкл. (битовое поле)'        data = self.modbus.read_holding_registers(AiReg.PR_STATE.value, 1)        return format(data[0], '012b')    def get_preset_min(self, channel):        'Минимальное значение уставки'        return self.modbus.read_float_holding(AiReg.PR_MIN.value + 2*(channel - 1))    def get_presets_min(self):        'Минимальное значение уставок по всем каналам'        data = struct.unpack('>ffffffffffff', self.modbus.read_holding_registers_raw(AiReg.PR_MIN.value, 24))        return data    def get_preset_max(self, channel):        'Маскимальное значение уставки'        return self.modbus.read_float_holding(AiReg.PR_MAX.value + 2*(channel - 1))    def get_presets_max(self):        'Максимальное значение уставок по все каналам'        return struct.unpack('>ffffffffffff', self.modbus.read_holding_registers_raw(AiReg.PR_MAX.value, 24))         def get_preset_hist(self, channel):        'Значение гистерезиса уставки'        return self.modbus.read_float_holding(AiReg.PR_HIST.value + 2*(channel - 1))    def get_presets_hist(self):        '''Значение гистерезисов уставок'''        return struct.unpack('>ffffffffffff', self.modbus.read_holding_registers_raw(AiReg.PR_HIST.value, 24))        # Установка параметров    def set_inputs_state(self, val):        'Значения состояний входов вкл./выкл. (битовое поле)'        self.modbus.write_holding_register(AiReg.IN_STATE.value , val)    def set_inputs_mode(self, val):        'Режим измерения входов (0 - напряжение или 1 - ток. (битовое поле))'        self.modbus.write_holding_register(AiReg.IN_STATE.value, val)    def set_voltage_range(self, val):        'Диапазон измерения напряжения (0 - диапазон 0 - 10 В, 1 - диапазон 0 - 1 В (битовое поле))'        self.modbus.write_holding_register(AiReg.IN_VOL_RANGE.value, val)    def set_current_range(self, val):        'Диапазон измерения тока (0 - диапазон 2- 20 мА, 1 - диапазон 0 - 20 мА (битовое поле))'        self.modbus.write_holding_register(AiReg.IN_CUR_RANGE.value, val)    def set_ext_sens_power(self, val):        'Состояние питания внешних датчиков (вкл./выкл.)'        self.modbus.write_holding_register(AiReg.EXT_SENS_POWER.value , val)    def set_input_gain(self, input, value):        '''        Коэффициент усиления канала [1..12] (внутренний коэф-т ADC)        Допустимые значения: 1, 2, 4, 8, 16, 32, 64, 128        '''        if value not in (1, 2, 4, 8, 16, 32, 64, 128):            return None        self.modbus.write_holding_register(AiReg.IN_GAINE_FACTOR.value + input - 1, value)    def set_k_factor(self, channel, value):        'Коэффициент пересчета K канала [1..12]'        self.modbus.write_float(AiReg.IN_K_FACTOR.value + 2*(channel - 1), value)    def set_b_factor(self, channel, value):        'Коэффициент пересчета K канала [1..12]'        self.modbus.write_float(AiReg.IN_B_FACTOR.value + 2*(channel - 1), value)    def set_presets_state(self, value):        'Значения состояний входов вкл./выкл. (битовое поле)'        self.modbus.write_holding_register(AiReg.PR_STATE.value , value)    def set_preset_min(self, channel, value):        'Минимальное значение уставки на отдельный канал'        self.modbus.write_float(AiReg.PR_MIN.value + 2*(channel - 1), value)    def set_preset_max(self, channel, value):        'Максимальное значение уставки на отдельный канал'        self.modbus.write_float(AiReg.PR_MAX.value + 2*(channel - 1), value)    def set_preset_hyst(self, channel, value):        'Значение гистерезиса уставки на отдельный канал'        self.modbus.write_float(AiReg.PR_HIST.value + 2*(channel - 1), value)    '''Настройки входов'''    def print_inputs(self):        print(Fore.GREEN + '____________________________________________')        print(Fore.GREEN + 'Analog inputs settings:')        # Значения состояний входов вкл./выкл. (битовое поле)        print('Inputs state [bit field]         :', Fore.GREEN + self.get_inputs_state())        # Режим измерения входов напряжение или ток. (битовое поле)        print('Inputs mode [bit field]          :', Fore.GREEN + self.get_inputs_mode())        # Диапазон измерения напряжения (битовое поле)        print('Inputs voltage range [bit field] :', Fore.GREEN + self.get_voltage_range())        # Диапазон измерения тока (битовое поле)        print('Inputs current range [bit field] :', Fore.GREEN + self.get_current_range())        # Уставки        print('Inputs presets [bit field]       :', Fore.GREEN + self.get_presets_state())        # Нижнее значение уставки        data = self.get_presets_min()        print('Inputs preset min: ', Fore.GREEN + f'1: {data[0]}, 2: {data[1]}, 3: {data[2]}, 4: {data[3]}, 5: {data[4]}, 6: {data[5]}')        print('Inputs preset min: ', Fore.GREEN + f'7: {data[6]}, 8: {data[7]}, 9: {data[8]}, 10: {data[9]}, 11: {data[10]}, 12: {data[11]}')        # Верхнее значение уставок        data = self.get_presets_max()        print('Inputs preset max: ', Fore.GREEN + f'1: {data[0]}, 2: {data[1]}, 3: {data[2]}, 4: {data[3]}, 5: {data[4]}, 6: {data[5]}')        print('Inputs preset max: ', Fore.GREEN + f'7: {data[6]}, 8: {data[7]}, 9: {data[8]}, 10: {data[9]}, 11: {data[10]}, 12: {data[11]}')        # Гистерезис        data = self.get_presets_hist()        print('Input preset hyst: ', Fore.GREEN + f'1: {data[0]}, 2: {data[1]}, 3: {data[2]}, 4: {data[3]}, 5: {data[4]}, 6: {data[5]}')        print('Input preset hyst: ', Fore.GREEN + f'7: {data[6]}, 8: {data[7]}, 9: {data[8]}, 10: {data[9]}, 11: {data[10]}, 12: {data[11]}')        # Коэффициенты усиления        # for i in range(1, 13):        #     print(f'Gain factor channel {i}    : ', end='')        #     print(Fore.GREEN + str(self.get_input_gain(i)))    '''Вывод параметров. Сырые данные 16 входов'''    def print_raw_inputs(self):        data = self.get_raw_inputs()        print(f"[ADC raw] IN_1: {data[0]}, IN_2: {data[1]}, IN_3: {data[2]}, IN_4: {data[3]}")        print(f"[ADC raw] IN_5: {data[4]}, IN_6: {data[5]}, IN_7: {data[6]}, IN_8: {data[7]}")        print(f"[ADC raw] IN_9: {data[8]}, IN_10: {data[9]}, IN_11: {data[10]}, IN_12: {data[11]}")        print(f"[ADC raw] V_ISO_CL: {data[12]}, V_ISO: {data[13]}")        print(f"[ADC raw] CRNT_LIM_U_BFR_R: {data[14]}, CRNT_LIM_U_ABFR_R: {data[15]}")    '''Вывод параметров. Фильтрованные данные 12 входов'''    def print_filtered_inputs(self):        data = self.get_fil_inputs()        print(f"[ADC fil] IN_1: {data[0]}, IN_2: {data[1]}, IN_3: {data[2]}, IN_4: {data[3]}")        print(f"[ADC fil] IN_5: {data[4]}, IN_6: {data[5]}, IN_7: {data[6]}, IN_8: {data[7]}")        print(f"[ADC fil] IN_9: {data[8]}, IN_10: {data[9]}, IN_11: {data[10]}, IN_12: {data[11]}")    '''Вывод данных на график'''    def show_graph(self, func, channel: str):        self.graph_input_func = func        self.graph_input = ai_name[channel]        ani = animation.FuncAnimation(self.fig, self.draw, interval=50)        plt.show()    def show_graph_filtered(self, channel: str):        self.graph_input = ai_name[channel]    def draw(self, i):        data = self.graph_input_func()        self.data.append(data[self.graph_input])         self.input.clear()        self.input.plot(self.x, self.data)        if len(self.data) == self.GRAPTH_LEN:            self.data.pop(0)            self.x.pop(0)        self.x.append(self.x[-1] + 1)        # print(self.in1)        # print(self.x)                def main():    colorama.init(autoreset=True)        serial_port = Serial('COM10', 115200, timeout=0.05, parity='N', xonxoff=False)        modbus_tester = Modbus(serial_port, 1)    modbus_tester.MB_DEBUG = False    # dev_tester = IO_Digital(modbus_tester)    ai = IO_AnalogInput(modbus_tester)     '''Режим работы аналоговых входов'''    # ai.set_inputs_state(0b1111_1111_1111)    # ai.print_inputs()            # ai.show_graph(ai.get_fil_inputs, 'AIN_FIL_1')    '''Уставки'''    # ai.set_presets_state(0b1111_1111_1111)    # for i in range(1, 13):    #     ai.set_preset_max(i, 18.5)    #     ai.set_preset_min(i, 2)    #     ai.set_preset_hyst(i, 1)        '''Данные входов'''    # while True:    #     print(ai.get_fil_inputs())    #     sleep(1)    # ai.set_voltage_range(0b00_0000_0000)    # ai.set_current_range(0b00_0000_0000)    # ai.print_inputs()    '''Системное'''    print(ai.sys.get_uptime())    # ai.sys.save_settings()    '''Установка коэффициентов усиления. Канал, коэффициент'''    '''    for channel in range(1, 13):        ai.set_input_gain(channel, 8)    for channel in range(1, 13):        print(ai.get_input_gain(channel))    '''    '''Коэффициенты K и B'''    # for i in range(1, 13):    #     ai.set_k_factor(i, 1.0)    #     ai.set_b_factor(i, 0.0)    # ai.sys.set_system_vars(1234)    # ai.sys.get_system_vars()    # print(ai.sys.get_bat_votage())    # ai.sys.set_rtc()        # ai.sys.set_system_vars(1234)    # ai.sys.get_system_vars()    # print(ai.sys.get_rtc())             # print(ai.get_inputs_state())    # ai.set_inputs_state(0b1111_1111_1111)    # print(ai.sys.get_module_state())    # ai.set_inputs_state(0b0000_0000_0001)    # ai.set_inputs_state(0b1111_1111_1111)    # print(ai.get_inputs_state())        # sleep(1)    # ai.set_inputs_state(0b0)    # print(ai.get_inputs_mode())    # ai.set_inputs_mode(0b0000_0000_0000)    # print(ai.get_inputs_mode())    '''Питание внешних датчиков'''    # ai.set_ext_sens_power(0)    '''Аварии аналоговых входов'''    # for i in range(100):    #     print(ai.get_inputs_alarm())    #     sleep(1)    '''Данные каналов'''    """    while True:        # ai.get_raw_inputs()        ai.get_fil_inputs()        # ai.print_raw_inputs()        # ai.print_filtered_inputs()        sleep(0.1)    """    # ai.get_raw_inputs()    # ai.print_raw_inputs()    '''Сырые данные'''    # while True:    #     print(ai.get_raw_input(12))    #     sleep(1)    '''Вывод на график. Сырые данные'''    # ai.show_graph(ai.get_raw_inputs,  'AIN_1')    '''Вывод на график. Фильтрованные данные'''    # ai.show_graph(ai.get_fil_inputs, 'AIN_FIL_12')    '''Системные настройки'''    # ai.sys.get_system_vars()    '''Сохранение настроек'''    # ai.sys.save_sattings()    '''Обновление прошивки'''    # serial_port.timeout = 1    # modbus_tester.MB_DEBUG = True    # ai.updater.update('fw.bin', 'MAI_12')    if __name__ == '__main__':    main()
 |