from socket import * from utils import * from sys import argv import time import colorama from colorama import Fore, Back, Style import os TCP_PORT = 80 HEAD = 'POST /api.php HTTP/1.1\r\nHost: 192.168.10.10' REQ_ID = {'getSpState':'7000000', 'getPpRegs':'3000000', 'getPp':'4000000', 'getHv':'8000000', 'getIp': '9000000', 'getPreset': '10000000', 'getPwr':'5000000', 'getMonRegs':'2000000', 'getMon':'1000000', 'getInfo':'11000000', 'getSp':'7000000', 'setSys':'7000000', 'setSpState': '7000000', 'setUpdate': '99999', 'getUpdate': '99999'} time_request = TimeIt() time_connect = TimeIt() error_no_answer = 0 erorr_brackets = 0 error_connection = 0 class SBS_client: def __init__(self, ip='192.168.10.10', port=80, debug=False): self.DEBUG = debug self.ip = ip self.HEAD = 'POST /api.php HTTP/1.1\r\nHost: ' + self.ip + ':' \ + str(port) + '\r\nContent-Length: ' self.addr = (ip, int(port)) def send_recv(self, data): data = str.encode(data) try: self.tcp_socket.send(data) except: pass rec = bytes() while True: try: chank = self.tcp_socket.recv(8000) rec += chank if not chank: break except: break if len(rec) == 0: error_no_answer += 1 # Проверяем ответ на наличие открывающих и закрывающих скобок result_str = rec.decode() open_brackets = result_str.count('{') close_brackets = result_str.count('}') if (open_brackets != close_brackets) or open_brackets == 0: erorr_brackets += 1 if self.DEBUG: print(Fore.YELLOW + 'Received answer:') print(Fore.GREEN + rec.decode()) @timeit(time_request) def http_request(self, method, params): # foo = '{"jsonrpc":"2.0","method":"' + method + '","params":' + \ # params + ',"id":' + REQ_ID[method] + '}' # request = self.HEAD + str(len(foo)) + '\r\n\r\n' + foo # if self.DEBUG: # print(Fore.YELLOW + 'Send HTTP request:') # print(Fore.BLUE + request) if self.connect(): # self.send_recv(request) self.close() @timeit(time_connect) def connect(self): self.tcp_socket = socket(AF_INET, SOCK_STREAM) self.tcp_socket.settimeout(15.0) try: self.tcp_socket.connect(self.addr) except: print("Ошибка соединения") error_connection += 1 return False return True def close(self): self.tcp_socket.close() # --------------------------------------------------------------------- # Обновление def set_update(self): params = '{"spID":"0", "update":{"serverIp":"172.16.2.8:9000","fileName":"/dir/fw.bin"}}' # params = '{"spID":"0", "update":{"serverIp":"172.16.2.8:9000","fileName":"E:\Greenstar\nSBS-ethernet-prime\bin\fw.bin"}}' self.http_request('setUpdate', params) def get_update(self): params = '{"spID":"0"}' self.http_request('getUpdate', params) # --------------------------------------------------------------------- # Уставки # setPreset def set_preset(self): self.send_recv('POST /api\r\n\r\n{{"id":1,"jsonrpc":2.0,"method":\ "setPreset","params":{"mode":"manual"}}') # --------------------------------------------------------------------- # Отправка группы команд в начале работы с прибором def startup(self): self.http_request('getSpState', 'null') self.http_request('getPpRegs', 'null') self.http_request('getPp', 'null') self.http_request('getHv', 'null') self.http_request('getIp', 'null') self.http_request('getPreset', 'null') self.http_request('getPwr', 'null') self.http_request('getMonRegs', 'null') self.http_request('getMon', 'null') self.http_request('getInfo', 'null') # Периодический опрос прибора def periodic_request(self, count=0): def procedure(): self.http_request('getMon', 'null') time.sleep(1) self.http_request('getSpState', 'null') time.sleep(1) if count == 0: while True: procedure() else: for _ in range(count): procedure() # Включить демо-данные def set_demo_data(self): self.http_request('setSys', '{"demo":true}') # Управление набором спектра # start/stop/clear def set_spectrum_state(self, state: str): foo = '{"acqState":"' + state + '"}' self.http_request('setSpState', foo) self.http_request('getPp', 'null') # Запрос спектра def get_spectrum_data(self): # while True: # if self.connect(): # self.close() for i in range(20): self.http_request('getSp', 'null') time.sleep(0.1) # self.http_request('getMon', 'null') # time.sleep(0.05) self.info() def info(self): print("\033[H\033[J", end="") print("Время соединения :", round(time_connect.cur_time, 5), round(time_connect.max_time, 5)) print("Время запроса-ответа:", round(time_request.cur_time, 5), round(time_request.max_time, 5)) print("Ошибки. Скобки:", erorr_brackets) print("Ошибки. Пустой ответ:", error_no_answer) print("Ошибки. Таймаут соединения:", error_connection) # Тесты def processing(ip, port): colorama.init(autoreset=True) client = SBS_client(ip, port) client.DEBUG = False # client.startup() # # usb_client.periodic_request(5) # client.set_demo_data() # client.set_spectrum_state('start') client.get_spectrum_data() def update(): colorama.init(autoreset=True) dev = SBS_client('172.16.2.101', 80) dev.set_update() # dev.get_update() def test_time(): print(time.time()) time.sleep(1) print(time.time()) def main(): # test_time() '''Получение данных спектра''' processing(argv[1], argv[2]) '''Обновление FW''' # update() # client.set_update() # client.get_update() # print('{"spID":"0", "update":{"serverIp":"192.168.0.25:80","fileNmae":"полный/путь/к/файлу.bin"}}') ''' if len(argv) == 2: print("Укажите IP-адрес и порт устройства") else: colorama.init(autoreset=True) ''' if __name__ == "__main__": main()