#!/usr/bin/env python3 """Base classes for MT-M2 modems support""" __author__ = "Andrew Kolyaskin" __email__ = "a.kolyaskin@labinsys.ru" __date__ = "04.04.2019" __status__ = "testing" import os import json import sys import time import zlib import socket import subprocess from datetime import datetime from urllib.parse import urlencode from collections import OrderedDict class AnnounceM2: """Container and parser for M2 UDP announce data""" # '{model};{serial};{mac};{sw_ver};{button_set};{button_mode};{stm32_id};;{production_date};{status};{button_box};' SEP = ';' TRUE_S = 'true' __slots__ = ('model', 'serial', 'mac', 'sw_ver', 'button_set', 'button_mode', 'stm32_id', 'production_date', 'status', 'button_box') def __init__(self, model, serial, mac, sw_ver, button_set, button_mode, stm32_id, prod_date, status, button_box): self.model, self.serial, self.mac, self.sw_ver = model, serial, mac, sw_ver self.button_set, self.button_mode, self.stm32_id = button_set, button_mode, stm32_id self.production_date, self.status, self.button_box = prod_date, status, button_box @classmethod def parse(cls, raw): """Parse raw bytes from received datagram""" if not isinstance(raw, str): try: raw = raw.decode() except Exception as e: return sp = raw.split(cls.SEP) if len(sp) != 12: return None true = cls.TRUE_S return cls(sp[0], sp[1], sp[2], sp[3], sp[4] == true, sp[5] == true, sp[6], sp[8], sp[9], sp[10] == true) class DeviceM2Error(Exception): """Exception class for M2 device""" class DeviceM2AuthFail(DeviceM2Error): """This exception raised on possible auth fail (example: got auth page instead of JSON)""" class DeviceM2ConnectionFail(DeviceM2Error): """This exception raised on socket connecting fail""" class DeviceM2: """Metrolog-M2 board service HTTP client""" TRY_COUNT = 3 TIMEOUT = 5 WAIT_TIME = 3 HTTP_PORT = 80 ANNOUNCE_PORT = 49049 T1READY_STATUS = 'T0OK' MODEL = 'Метролог M2' SETTINGS_FORM_KEYS = ('cursor', 'eth_ena', 'eth_prior', 'eth_ip_test', 'gsm1_prior', 'gsm1_ip_test', 'gsm2_prior', 'gsm2_ip_test', 'ipaddr', 'gw', 'mask', 'gsm1_ena', 'gsm1_profile', 'gsm1_apn', 'gsm1_login', 'gsm1_passw', 'gsm2_ena', 'gsm2_profile', 'gsm2_apn', 'gsm2_login', 'gsm2_passw', 'srv_ip', 'srv_port', 'pgw1_en', 'pgw1_dscr', 'pgw1_rs', 'pgw1_baud', 'pgw1_par', 'pgw1_ndata', 'pgw1_nstop', 'pgw1_mode', 'pgw1_trans', 'pgw1_port', 'pgw2_dscr', 'pgw2_rs', 'pgw2_baud', 'pgw2_par', 'pgw2_ndata', 'pgw2_nstop', 'pgw2_mode', 'pgw2_trans', 'pgw2_port', 'pgw3_dscr', 'pgw3_rs', 'pgw3_baud', 'pgw3_par', 'pgw3_ndata', 'pgw3_nstop', 'pgw3_mode', 'pgw3_trans', 'pgw3_port', 'ntpservip1', 'ntpservip2', 'password', 'utc', 'ntp', 'date', 'time') def __init__(self, ip, params=None): self.ip = ip if isinstance(params, AnnounceM2): self.model, self.serial, self.mac = params.model, params.serial, params.mac self.sw_ver, self.stm32_id, self.status = params.sw_ver, params.stm32_id, params.status else: self.model = self.serial = self.mac = self.sw_ver = self.stm32_id = self.status = None self.cookies = '' def ping(self): """Test connection with device using ping""" null = open(os.devnull, 'wb') try: subprocess.check_call(['ping', '-c', '5', '-i', '.8', '-w', '15', '-W', '1', self.ip], timeout=16, stdout=null, stderr=null) return True except (subprocess.CalledProcessError, subprocess.TimeoutExpired): return False def _request(self, path, method='GET', body='', parse_json=True, add_timestamp=False, no_response=False, timeout=0): """Do HTTP request using raw TCP""" sock = socket.socket() sock.settimeout(timeout if timeout else self.TIMEOUT) try: sock.connect((self.ip, self.HTTP_PORT)) if add_timestamp: path = '{}?_={}'.format(path, int(time.time() * 1000)) req = '{} {} HTTP/1.1\r\nContent-Length: {}\r\nCookies: {}\r\n\r\n{}'.format( method, path, len(body), self.cookies, body).encode() sock.sendall(req) time.sleep(self.WAIT_TIME) res = None if no_response else sock.recv(32768) except socket.timeout: raise DeviceM2Error('Тайм-аут связи с тестируемым устройством') except Exception as e: print(e) raise DeviceM2ConnectionFail('Ошибка связи с тестируемым устройством') else: if parse_json and not no_response: try: body = res.split(b'\r\n\r\n')[1] return json.loads(body.decode()) except UnicodeDecodeError: try: html = zlib.decompress(body, 16 + zlib.MAX_WBITS).decode() if 'action="login.cgi"' in html and '

Авторизация

' in html: raise DeviceM2AuthFail('Ошибка авторизации') except (zlib.error, UnicodeDecodeError): raise DeviceM2Error('Некорректный JSON-ответ') except (json.JSONDecodeError, IndexError): raise DeviceM2Error('Некорректный JSON-ответ') else: return res @staticmethod def announce_socket(): """Create socket to receive announce messages""" sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.settimeout(.5) sock.bind(('0.0.0.0', DeviceM2.ANNOUNCE_PORT)) return sock @staticmethod def parse_uptime_str(uptime_str): """Parse ru uptime string, return result in seconds '0 дн. 0 ч. 5 мин.'""" d, h, m = map(int, uptime_str.split()[::2]) return 86400 * d + 3600 * h + 60 * m def wait_announce(self, timeout=None, get_ip=False): """Wait next UDP announce from device""" if timeout is None: timeout = self.TIMEOUT sock = self.announce_socket() start = datetime.now() while True: try: raw, address = sock.recvfrom(4096) except socket.timeout: pass else: announce = AnnounceM2.parse(raw) if announce is not None: if announce.stm32_id == self.stm32_id: return (address, announce) if get_ip else announce finally: if (datetime.now() - start).seconds > timeout: raise DeviceM2Error('Превышено время ожидания анонса') def login(self): """Log in to web interface of device""" if 'M3' in str(self.model): login, password = 'user', 'uchetmo' else: login, password = 'admin', '12345' cookie_mark = b'Set-Cookie: ' res = self._request('/login.cgi', 'POST', f'login={login}&password={password}', False) head = res.split(b'\r\n\r\n')[0] self.cookies = b'; '.join(i[len(cookie_mark):] for i in filter(lambda x: x.startswith(cookie_mark), head.split(b'\r\n'))).decode() return bool(self.cookies) def dump_cookies(self, path): """Save cookies to file""" with open(path, 'w') as f: f.write(self.cookies) def load_cookies(self, path): """Load cookies from file""" with open(path) as f: self.cookies = f.read() def reboot(self): """Send reboot CGI request, do not wait response""" self._request('/reboot.cgi', add_timestamp=True, no_response=True) def update_fw_m2(self, fw_path, part=4096, callback=None): """Update M2 firmware from file""" boundary = '---------------------------69299438174861' data = open(fw_path, 'rb').read() sock = socket.socket() sock.connect((self.ip, self.HTTP_PORT)) body = b''.join((b'-----------------------------69299438174861\r\n' b'Content-Disposition: form-data; name="file1"; filename="MT_M01.bin"\r\n' b'Content-Type: application/octet-stream\r\n\r\n', data, b'\r\n-----------------------------69299438174861--\r\n')) req = 'POST /upload.cgi HTTP/1.1\r\nContent-Type: multipart/form-data; boundary={}\r\n' \ 'Content-Length: {}\r\nCookies: {}\r\n\r\n'.format(boundary, len(body), self.cookies).encode() request = req + body length = len(request) for i in range(0, length, part): sock.sendall(request[i: i + part]) if callable(callback): callback(i, part, length) res = sock.recv(4096) return b'200 OK' in res and res.endswith(b'1') def update_fw_m3(self, fw_path, part=800, callback=None): """Update M3 firmware from file""" boundary = '---------------------------69299438174861' data = open(fw_path, 'rb').read() sock = socket.socket() # sock.setsockopt(socket.SOL_TCP, socket.TCP_MAXSEG, 1300) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) sock.connect((self.ip, self.HTTP_PORT)) sock.settimeout(1) body = b''.join((b'-----------------------------69299438174861\r\n' b'Content-Disposition: form-data; name="file1"; filename="MT_M03.bin"\r\n' b'Content-Type: application/octet-stream\r\n\r\n', data, b'\r\n-----------------------------69299438174861--\r\n')) req = 'POST /upload.cgi HTTP/1.1\r\nContent-Type: multipart/form-data; boundary={}\r\n' \ 'Content-Length: {}\r\nCookies: {}\r\n\r\n'.format(boundary, len(body), self.cookies).encode() request = req + body length = len(request) for i in range(0, length, part): chunk = i + part if chunk > length: chunk = length offset = 0 current_part_length = chunk - i while offset < current_part_length: try: r = sock.send(request[i + offset: chunk]) except (BlockingIOError, socket.timeout): r = 0 print(i) print(f'Sent {r} bytes (from {i + offset} to {chunk} of {length})') offset += r if offset < current_part_length: print('sleep on fail') time.sleep(2) else: print('sleep on ok') time.sleep(1) if callable(callback): callback(i, part, length) time.sleep(1) start_time = time.monotonic() res = b'No any data' print('Reading response...') while time.monotonic() - start_time < 120: try: print(f'Try recv...') res = sock.recv(4096) print(res) break except (BlockingIOError, socket.timeout) as e: time.sleep(0.5) print(e) res = b'No response' print(res) return b'200 OK' in res and res.endswith(b'1') def update_fw(self, fw_path, part=4096, callback=None): """Update Mx firmware from file""" if 'M3' in str(self.model): return self.update_fw_m3(fw_path, part, callback) else: return self.update_fw_m2(fw_path, part, callback) def set_system_variables(self, serial, mac, production_date_str): """Set system variables from dictionary of strings for device through HTTP GET request""" sysvars_dict = dict(serial=serial, mac=mac, proddate=production_date_str) print(sysvars_dict) try: res = self._request('/service_set_sysvars.cgi?{}'.format(urlencode(sysvars_dict)), parse_json=False) except DeviceM2ConnectionFail: res = self._request('/service_set_sysvars.cgi?{}'.format(urlencode(sysvars_dict)), parse_json=False) if b'200' in res: print("return 200") time.sleep(1) a = self.wait_announce() return a else: DeviceM2Error('Ошибка при установке системных переменных') def set_status(self, status): """Set system variables from dictionary of strings for device through HTTP GET request""" sysvars_dict = dict(status_fail=status) try: self._request('/service_set_test_state.cgi?{}'.format(urlencode(sysvars_dict)), parse_json=False) except DeviceM2Error: pass return self.wait_announce().status == status def set_led(self, led, color, state, freq=None): """Control M2 LEDs: server and status, color ::= r | g, state ::= on | off | blink""" assert led in ('server', 'status') assert color in ('r', 'g') assert state in ('on', 'off', 'blink') url = '/service_led.cgi?led={}&color={}&state={}'.format(led, color, state) if freq is not None: url += '&freq={}'.format(int(freq)) self._request(url, parse_json=False) def reboot_to_iap(self): """Reboot device to IAP mode using GET request (for M3 only now)""" return self._request('/goboot.cgi', no_response=True) def check_iap(self): """Check if M2 device is in IAP mode (for M3 only now)""" return b'url=/upload.html' in self._request('/', parse_json=False) def get_info(self): """Get info JSON from WEB interface""" return self._request('/info.cgi', add_timestamp=True) def get_p_info(self): """Get info for long test from special CGI page""" return self._request('/progon.cgi') def get_settings(self): """Get settings JSON from WEB interface""" return self._request('/settings.cgi', add_timestamp=True) def confirm(self): """Send reboot CGI request, do not wait response""" self._request('/confirm.cgi', add_timestamp=True, no_response=False, parse_json=False) @staticmethod def _rectify_settings(settings): """Convert settings JSON from tree to plain structure""" up = dict() for i, obj in enumerate(settings.pop('gsm')): for k, v in obj.items(): up['gsm{}_{}'.format(i + 1, k[4:])] = v for i, obj in enumerate(settings.pop('pgw')): for k, v in obj.items(): up['pgw{}_{}'.format(i + 1, k[4:])] = v settings.update(up) return settings @staticmethod def _to_http_form_value(value): """Convert value to HTTP form format""" if isinstance(value, bool): return 'on' if value else '' elif isinstance(value, float): return str(int(value)) else: return str(value) def _set_settings(self, url, no_response, settings_dict): """Common function for changing M2 settings using HTTP form POSTing""" settings = self.get_settings() r_settings = self._rectify_settings(settings) new_settings = OrderedDict() new_settings['cursor'] = '0' new_settings['password'] = '' for k in self.SETTINGS_FORM_KEYS: if k in r_settings: new_settings[k] = self._to_http_form_value(r_settings[k]) for k, v in settings_dict.items(): if v is None: if k in new_settings: new_settings.pop(k) else: new_settings[k] = v return self._request(url, 'POST', urlencode(new_settings), False, False, no_response) def set_settings(self, **kwargs): """Set ordinary settings for M2""" return self._set_settings('/settings.cgi', False, kwargs) def set_settings_reboot(self, **kwargs): """Set settings causes reboot for M2 - do not wait response""" return self._set_settings('/settings.cgi', True, kwargs) def set_settings_autoconfirm(self, **kwargs): """Set settings with auto-confirmation for M2, use service URL""" return self._set_settings('/settings_service.cgi', False, kwargs) def reset_settings(self): """Reset device settings to default""" self._request('/reset.cgi', parse_json=False, add_timestamp=True) def main(ip: str, path: str): #d = DeviceM2('178.176.41.47') #d = DeviceM2('192.168.10.254') d = DeviceM2(ip) d.model = 'M3' d.login() # d.set_led('status', 'g', 'on') # print(d.get_info()) print(d.set_system_variables('7347382', 'EC-4C-4D-02-0E-D6', '05.07.2023+17:41')) # l = d.login() # print(f'Login - {l}') # d.update_fw_m3(path) # time.sleep(10) """ {'uptime': '0 дн. 0 ч. 4 мин.', 'comment': '', 'incharge': '', 'digit_id': 0.0, 'fwversion': '2.014', 'iapversion': '1.03', 'owner': '', 'macaddr': 'EC-4C-4D-02-0E-D6', 'model': 'Метролог M3', 'mboard_rev': '7', 'dboard_rev': '7', 'mfr': 'АО "НПК РоТеК"', 'sysLocation': '', 'prodate': '06.07.2023+17:41', 'serno': '7347382', 'netsettings_changed': False, 'utm': 1760983071.0} """ if __name__ == '__main__': main("192.168.1.101", "fw.bin") # if len(sys.argv) != 3: # print(f'Usage: {sys.argv[0]} ip fw_binary_path') # sys.exit(2) # sys.exit(main(sys.argv[1], sys.argv[2]))