123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- from socket import *
- from utils import *
- from sys import argv
- import time
- import colorama
- from colorama import Fore, Back, Style
- import os
- TCP_PORT = 80
- HEAD = 'POST /api.php HTTP/1.1\r\nHost: 192.168.10.10'
- REQ_ID = {'getSpState':'7000000', 'getPpRegs':'3000000', 'getPp':'4000000',
- 'getHv':'8000000', 'getIp': '9000000', 'getPreset': '10000000',
- 'getPwr':'5000000', 'getMonRegs':'2000000', 'getMon':'1000000',
- 'getInfo':'11000000', 'getSp':'7000000', 'setSys':'7000000',
- 'setSpState': '7000000', 'setUpdate': '99999', 'getUpdate': '99999'}
- time_request = TimeIt()
- time_connect = TimeIt()
- error_no_answer = 0
- erorr_brackets = 0
- error_connection = 0
- class SBS_client:
- def __init__(self, ip='192.168.10.10', port=80, debug=False):
- self.DEBUG = debug
- self.ip = ip
- self.HEAD = 'POST /api.php HTTP/1.1\r\nHost: ' + self.ip + ':' \
- + str(port) + '\r\nContent-Length: '
- self.addr = (ip, int(port))
- def send_recv(self, data):
- data = str.encode(data)
- try:
- self.tcp_socket.send(data)
- except:
- pass
- rec = bytes()
- while True:
- try:
- chank = self.tcp_socket.recv(8000)
- rec += chank
- if not chank:
- break
- except:
- break
- if len(rec) == 0:
- error_no_answer += 1
- # Проверяем ответ на наличие открывающих и закрывающих скобок
- result_str = rec.decode()
- open_brackets = result_str.count('{')
- close_brackets = result_str.count('}')
- if (open_brackets != close_brackets) or open_brackets == 0:
- erorr_brackets += 1
- if self.DEBUG:
- print(Fore.YELLOW + 'Received answer:')
- print(Fore.GREEN + rec.decode())
- @timeit(time_request)
- def http_request(self, method, params):
- # foo = '{"jsonrpc":"2.0","method":"' + method + '","params":' + \
- # params + ',"id":' + REQ_ID[method] + '}'
- # request = self.HEAD + str(len(foo)) + '\r\n\r\n' + foo
- # if self.DEBUG:
- # print(Fore.YELLOW + 'Send HTTP request:')
- # print(Fore.BLUE + request)
- if self.connect():
- # self.send_recv(request)
- self.close()
- @timeit(time_connect)
- def connect(self):
- self.tcp_socket = socket(AF_INET, SOCK_STREAM)
- self.tcp_socket.settimeout(15.0)
- try:
- self.tcp_socket.connect(self.addr)
- except:
- print("Ошибка соединения")
- error_connection += 1
- return False
- return True
- def close(self):
- self.tcp_socket.close()
- # ---------------------------------------------------------------------
- # Обновление
- def set_update(self):
- params = '{"spID":"0", "update":{"serverIp":"172.16.2.8:9000","fileName":"/dir/fw.bin"}}'
- # params = '{"spID":"0", "update":{"serverIp":"172.16.2.8:9000","fileName":"E:\Greenstar\nSBS-ethernet-prime\bin\fw.bin"}}'
- self.http_request('setUpdate', params)
- def get_update(self):
- params = '{"spID":"0"}'
- self.http_request('getUpdate', params)
- # ---------------------------------------------------------------------
- # Уставки
- # setPreset
- def set_preset(self):
- self.send_recv('POST /api\r\n\r\n{{"id":1,"jsonrpc":2.0,"method":\
- "setPreset","params":{"mode":"manual"}}')
- # ---------------------------------------------------------------------
- # Отправка группы команд в начале работы с прибором
- def startup(self):
- self.http_request('getSpState', 'null')
- self.http_request('getPpRegs', 'null')
- self.http_request('getPp', 'null')
- self.http_request('getHv', 'null')
- self.http_request('getIp', 'null')
- self.http_request('getPreset', 'null')
- self.http_request('getPwr', 'null')
- self.http_request('getMonRegs', 'null')
- self.http_request('getMon', 'null')
- self.http_request('getInfo', 'null')
- # Периодический опрос прибора
- def periodic_request(self, count=0):
-
- def procedure():
- self.http_request('getMon', 'null')
- time.sleep(1)
- self.http_request('getSpState', 'null')
- time.sleep(1)
-
- if count == 0:
- while True:
- procedure()
- else:
- for _ in range(count):
- procedure()
-
- # Включить демо-данные
- def set_demo_data(self):
- self.http_request('setSys', '{"demo":true}')
- # Управление набором спектра
- # start/stop/clear
- def set_spectrum_state(self, state: str):
- foo = '{"acqState":"' + state + '"}'
- self.http_request('setSpState', foo)
- self.http_request('getPp', 'null')
- # Запрос спектра
- def get_spectrum_data(self):
- # while True:
- # if self.connect():
- # self.close()
- for i in range(20):
- self.http_request('getSp', 'null')
- time.sleep(0.1)
- # self.http_request('getMon', 'null')
- # time.sleep(0.05)
- self.info()
- def info(self):
- print("\033[H\033[J", end="")
- print("Время соединения :", round(time_connect.cur_time, 5), round(time_connect.max_time, 5))
- print("Время запроса-ответа:", round(time_request.cur_time, 5), round(time_request.max_time, 5))
- print("Ошибки. Скобки:", erorr_brackets)
- print("Ошибки. Пустой ответ:", error_no_answer)
- print("Ошибки. Таймаут соединения:", error_connection)
-
- # Тесты
- def processing(ip, port):
- colorama.init(autoreset=True)
- client = SBS_client(ip, port)
- client.DEBUG = False
- # client.startup()
- # # usb_client.periodic_request(5)
- # client.set_demo_data()
- # client.set_spectrum_state('start')
- client.get_spectrum_data()
- def update():
- colorama.init(autoreset=True)
- dev = SBS_client('172.16.2.101', 80)
- dev.set_update()
- # dev.get_update()
- def test_time():
- print(time.time())
- time.sleep(1)
- print(time.time())
- def main():
- # test_time()
- '''Получение данных спектра'''
- processing(argv[1], argv[2])
- '''Обновление FW'''
- # update()
- # client.set_update()
- # client.get_update()
- # print('{"spID":"0", "update":{"serverIp":"192.168.0.25:80","fileNmae":"полный/путь/к/файлу.bin"}}')
- '''
- if len(argv) == 2:
- print("Укажите IP-адрес и порт устройства")
- else:
- colorama.init(autoreset=True)
-
- '''
- if __name__ == "__main__":
- main()
|