|
@@ -0,0 +1,423 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+
|
|
|
+"""Base classes for MT-M2 modems support"""
|
|
|
+
|
|
|
+__author__ = "Andrew Kolyaskin"
|
|
|
+__email__ = "a.kolyaskin@labinsys.ru"
|
|
|
+__date__ = "04.04.2019"
|
|
|
+__status__ = "testing"
|
|
|
+
|
|
|
+
|
|
|
+import os
|
|
|
+import json
|
|
|
+import sys
|
|
|
+import time
|
|
|
+import zlib
|
|
|
+import socket
|
|
|
+import subprocess
|
|
|
+from datetime import datetime
|
|
|
+from urllib.parse import urlencode
|
|
|
+from collections import OrderedDict
|
|
|
+
|
|
|
+
|
|
|
+class AnnounceM2:
|
|
|
+ """Container and parser for M2 UDP announce data"""
|
|
|
+ # '{model};{serial};{mac};{sw_ver};{button_set};{button_mode};{stm32_id};;{production_date};{status};{button_box};'
|
|
|
+ SEP = ';'
|
|
|
+ TRUE_S = 'true'
|
|
|
+ __slots__ = ('model', 'serial', 'mac', 'sw_ver', 'button_set',
|
|
|
+ 'button_mode', 'stm32_id', 'production_date', 'status', 'button_box')
|
|
|
+
|
|
|
+ def __init__(self, model, serial, mac, sw_ver, button_set, button_mode, stm32_id, prod_date, status, button_box):
|
|
|
+ self.model, self.serial, self.mac, self.sw_ver = model, serial, mac, sw_ver
|
|
|
+ self.button_set, self.button_mode, self.stm32_id = button_set, button_mode, stm32_id
|
|
|
+ self.production_date, self.status, self.button_box = prod_date, status, button_box
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def parse(cls, raw):
|
|
|
+ """Parse raw bytes from received datagram"""
|
|
|
+ if not isinstance(raw, str):
|
|
|
+ try:
|
|
|
+ raw = raw.decode()
|
|
|
+ except Exception as e:
|
|
|
+ return
|
|
|
+ sp = raw.split(cls.SEP)
|
|
|
+ if len(sp) != 12:
|
|
|
+ return None
|
|
|
+ true = cls.TRUE_S
|
|
|
+ return cls(sp[0], sp[1], sp[2], sp[3], sp[4] == true, sp[5] == true, sp[6], sp[8], sp[9], sp[10] == true)
|
|
|
+
|
|
|
+
|
|
|
+class DeviceM2Error(Exception):
|
|
|
+ """Exception class for M2 device"""
|
|
|
+
|
|
|
+
|
|
|
+class DeviceM2AuthFail(DeviceM2Error):
|
|
|
+ """This exception raised on possible auth fail (example: got auth page instead of JSON)"""
|
|
|
+
|
|
|
+
|
|
|
+class DeviceM2ConnectionFail(DeviceM2Error):
|
|
|
+ """This exception raised on socket connecting fail"""
|
|
|
+
|
|
|
+
|
|
|
+class DeviceM2:
|
|
|
+ """Metrolog-M2 board service HTTP client"""
|
|
|
+ TRY_COUNT = 3
|
|
|
+ TIMEOUT = 5
|
|
|
+ WAIT_TIME = 3
|
|
|
+ HTTP_PORT = 80
|
|
|
+ ANNOUNCE_PORT = 49049
|
|
|
+ T1READY_STATUS = 'T0OK'
|
|
|
+ MODEL = 'Метролог M2'
|
|
|
+ SETTINGS_FORM_KEYS = ('cursor', 'eth_ena', 'eth_prior', 'eth_ip_test', 'gsm1_prior', 'gsm1_ip_test', 'gsm2_prior',
|
|
|
+ 'gsm2_ip_test', 'ipaddr', 'gw', 'mask', 'gsm1_ena', 'gsm1_profile', 'gsm1_apn', 'gsm1_login',
|
|
|
+ 'gsm1_passw', 'gsm2_ena', 'gsm2_profile', 'gsm2_apn', 'gsm2_login', 'gsm2_passw', 'srv_ip',
|
|
|
+ 'srv_port', 'pgw1_en', 'pgw1_dscr', 'pgw1_rs', 'pgw1_baud', 'pgw1_par', 'pgw1_ndata',
|
|
|
+ 'pgw1_nstop', 'pgw1_mode', 'pgw1_trans', 'pgw1_port', 'pgw2_dscr', 'pgw2_rs', 'pgw2_baud',
|
|
|
+ 'pgw2_par', 'pgw2_ndata', 'pgw2_nstop', 'pgw2_mode', 'pgw2_trans', 'pgw2_port', 'pgw3_dscr',
|
|
|
+ 'pgw3_rs', 'pgw3_baud', 'pgw3_par', 'pgw3_ndata', 'pgw3_nstop', 'pgw3_mode', 'pgw3_trans',
|
|
|
+ 'pgw3_port', 'ntpservip1', 'ntpservip2', 'password', 'utc', 'ntp', 'date', 'time')
|
|
|
+
|
|
|
+ def __init__(self, ip, params=None):
|
|
|
+ self.ip = ip
|
|
|
+ if isinstance(params, AnnounceM2):
|
|
|
+ self.model, self.serial, self.mac = params.model, params.serial, params.mac
|
|
|
+ self.sw_ver, self.stm32_id, self.status = params.sw_ver, params.stm32_id, params.status
|
|
|
+ else:
|
|
|
+ self.model = self.serial = self.mac = self.sw_ver = self.stm32_id = self.status = None
|
|
|
+ self.cookies = ''
|
|
|
+
|
|
|
+ def ping(self):
|
|
|
+ """Test connection with device using ping"""
|
|
|
+ null = open(os.devnull, 'wb')
|
|
|
+ try:
|
|
|
+ subprocess.check_call(['ping', '-c', '5', '-i', '.8', '-w', '15', '-W', '1', self.ip],
|
|
|
+ timeout=16, stdout=null, stderr=null)
|
|
|
+ return True
|
|
|
+ except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
|
|
|
+ return False
|
|
|
+
|
|
|
+ def _request(self, path, method='GET', body='', parse_json=True, add_timestamp=False, no_response=False, timeout=0):
|
|
|
+ """Do HTTP request using raw TCP"""
|
|
|
+ sock = socket.socket()
|
|
|
+ sock.settimeout(timeout if timeout else self.TIMEOUT)
|
|
|
+ try:
|
|
|
+ sock.connect((self.ip, self.HTTP_PORT))
|
|
|
+ if add_timestamp:
|
|
|
+ path = '{}?_={}'.format(path, int(time.time() * 1000))
|
|
|
+ req = '{} {} HTTP/1.1\r\nContent-Length: {}\r\nCookies: {}\r\n\r\n{}'.format(
|
|
|
+ method, path, len(body), self.cookies, body).encode()
|
|
|
+ sock.sendall(req)
|
|
|
+ time.sleep(self.WAIT_TIME)
|
|
|
+ res = None if no_response else sock.recv(32768)
|
|
|
+ except socket.timeout:
|
|
|
+ raise DeviceM2Error('Тайм-аут связи с тестируемым устройством')
|
|
|
+ except Exception as e:
|
|
|
+ print(e)
|
|
|
+ raise DeviceM2ConnectionFail('Ошибка связи с тестируемым устройством')
|
|
|
+ else:
|
|
|
+ if parse_json and not no_response:
|
|
|
+ try:
|
|
|
+ body = res.split(b'\r\n\r\n')[1]
|
|
|
+ return json.loads(body.decode())
|
|
|
+ except UnicodeDecodeError:
|
|
|
+ try:
|
|
|
+ html = zlib.decompress(body, 16 + zlib.MAX_WBITS).decode()
|
|
|
+ if 'action="login.cgi"' in html and '<h1>Авторизация</h1>' in html:
|
|
|
+ raise DeviceM2AuthFail('Ошибка авторизации')
|
|
|
+ except (zlib.error, UnicodeDecodeError):
|
|
|
+ raise DeviceM2Error('Некорректный JSON-ответ')
|
|
|
+ except (json.JSONDecodeError, IndexError):
|
|
|
+ raise DeviceM2Error('Некорректный JSON-ответ')
|
|
|
+ else:
|
|
|
+ return res
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def announce_socket():
|
|
|
+ """Create socket to receive announce messages"""
|
|
|
+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
|
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
|
+ sock.settimeout(.5)
|
|
|
+ sock.bind(('0.0.0.0', DeviceM2.ANNOUNCE_PORT))
|
|
|
+ return sock
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def parse_uptime_str(uptime_str):
|
|
|
+ """Parse ru uptime string, return result in seconds '0 дн. 0 ч. 5 мин.'"""
|
|
|
+ d, h, m = map(int, uptime_str.split()[::2])
|
|
|
+ return 86400 * d + 3600 * h + 60 * m
|
|
|
+
|
|
|
+ def wait_announce(self, timeout=None, get_ip=False):
|
|
|
+ """Wait next UDP announce from device"""
|
|
|
+ if timeout is None:
|
|
|
+ timeout = self.TIMEOUT
|
|
|
+ sock = self.announce_socket()
|
|
|
+ start = datetime.now()
|
|
|
+ while True:
|
|
|
+ try:
|
|
|
+ raw, address = sock.recvfrom(4096)
|
|
|
+ except socket.timeout:
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ announce = AnnounceM2.parse(raw)
|
|
|
+ if announce is not None:
|
|
|
+ if announce.stm32_id == self.stm32_id:
|
|
|
+ return (address, announce) if get_ip else announce
|
|
|
+ finally:
|
|
|
+ if (datetime.now() - start).seconds > timeout:
|
|
|
+ raise DeviceM2Error('Превышено время ожидания анонса')
|
|
|
+
|
|
|
+ def login(self):
|
|
|
+ """Log in to web interface of device"""
|
|
|
+ if 'M3' in str(self.model):
|
|
|
+ login, password = 'user', 'uchetmo'
|
|
|
+ else:
|
|
|
+ login, password = 'admin', '12345'
|
|
|
+ cookie_mark = b'Set-Cookie: '
|
|
|
+ res = self._request('/login.cgi', 'POST', f'login={login}&password={password}', False)
|
|
|
+ head = res.split(b'\r\n\r\n')[0]
|
|
|
+ self.cookies = b'; '.join(i[len(cookie_mark):] for i in filter(lambda x: x.startswith(cookie_mark),
|
|
|
+ head.split(b'\r\n'))).decode()
|
|
|
+ return bool(self.cookies)
|
|
|
+
|
|
|
+ def dump_cookies(self, path):
|
|
|
+ """Save cookies to file"""
|
|
|
+ with open(path, 'w') as f:
|
|
|
+ f.write(self.cookies)
|
|
|
+
|
|
|
+ def load_cookies(self, path):
|
|
|
+ """Load cookies from file"""
|
|
|
+ with open(path) as f:
|
|
|
+ self.cookies = f.read()
|
|
|
+
|
|
|
+ def reboot(self):
|
|
|
+ """Send reboot CGI request, do not wait response"""
|
|
|
+ self._request('/reboot.cgi', add_timestamp=True, no_response=True)
|
|
|
+
|
|
|
+ def update_fw_m2(self, fw_path, part=4096, callback=None):
|
|
|
+ """Update M2 firmware from file"""
|
|
|
+ boundary = '---------------------------69299438174861'
|
|
|
+ data = open(fw_path, 'rb').read()
|
|
|
+ sock = socket.socket()
|
|
|
+ sock.connect((self.ip, self.HTTP_PORT))
|
|
|
+ body = b''.join((b'-----------------------------69299438174861\r\n'
|
|
|
+ b'Content-Disposition: form-data; name="file1"; filename="MT_M01.bin"\r\n'
|
|
|
+ b'Content-Type: application/octet-stream\r\n\r\n', data,
|
|
|
+ b'\r\n-----------------------------69299438174861--\r\n'))
|
|
|
+ req = 'POST /upload.cgi HTTP/1.1\r\nContent-Type: multipart/form-data; boundary={}\r\n' \
|
|
|
+ 'Content-Length: {}\r\nCookies: {}\r\n\r\n'.format(boundary, len(body), self.cookies).encode()
|
|
|
+ request = req + body
|
|
|
+ length = len(request)
|
|
|
+ for i in range(0, length, part):
|
|
|
+ sock.sendall(request[i: i + part])
|
|
|
+ if callable(callback):
|
|
|
+ callback(i, part, length)
|
|
|
+ res = sock.recv(4096)
|
|
|
+ return b'200 OK' in res and res.endswith(b'1')
|
|
|
+
|
|
|
+ def update_fw_m3(self, fw_path, part=800, callback=None):
|
|
|
+ """Update M3 firmware from file"""
|
|
|
+ boundary = '---------------------------69299438174861'
|
|
|
+ data = open(fw_path, 'rb').read()
|
|
|
+ sock = socket.socket()
|
|
|
+ # sock.setsockopt(socket.SOL_TCP, socket.TCP_MAXSEG, 1300)
|
|
|
+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
|
|
+ sock.connect((self.ip, self.HTTP_PORT))
|
|
|
+ sock.settimeout(1)
|
|
|
+ body = b''.join((b'-----------------------------69299438174861\r\n'
|
|
|
+ b'Content-Disposition: form-data; name="file1"; filename="MT_M03.bin"\r\n'
|
|
|
+ b'Content-Type: application/octet-stream\r\n\r\n', data,
|
|
|
+ b'\r\n-----------------------------69299438174861--\r\n'))
|
|
|
+ req = 'POST /upload.cgi HTTP/1.1\r\nContent-Type: multipart/form-data; boundary={}\r\n' \
|
|
|
+ 'Content-Length: {}\r\nCookies: {}\r\n\r\n'.format(boundary, len(body), self.cookies).encode()
|
|
|
+ request = req + body
|
|
|
+ length = len(request)
|
|
|
+ for i in range(0, length, part):
|
|
|
+ chunk = i + part
|
|
|
+ if chunk > length:
|
|
|
+ chunk = length
|
|
|
+ offset = 0
|
|
|
+ current_part_length = chunk - i
|
|
|
+ while offset < current_part_length:
|
|
|
+ try:
|
|
|
+ r = sock.send(request[i + offset: chunk])
|
|
|
+ except (BlockingIOError, socket.timeout):
|
|
|
+ r = 0
|
|
|
+ print(i)
|
|
|
+ print(f'Sent {r} bytes (from {i + offset} to {chunk} of {length})')
|
|
|
+ offset += r
|
|
|
+ if offset < current_part_length:
|
|
|
+ print('sleep on fail')
|
|
|
+ time.sleep(2)
|
|
|
+ else:
|
|
|
+ print('sleep on ok')
|
|
|
+ time.sleep(1)
|
|
|
+ if callable(callback):
|
|
|
+ callback(i, part, length)
|
|
|
+ time.sleep(1)
|
|
|
+ start_time = time.monotonic()
|
|
|
+ res = b'No any data'
|
|
|
+ print('Reading response...')
|
|
|
+ while time.monotonic() - start_time < 120:
|
|
|
+ try:
|
|
|
+ print(f'Try recv...')
|
|
|
+ res = sock.recv(4096)
|
|
|
+ print(res)
|
|
|
+ break
|
|
|
+ except (BlockingIOError, socket.timeout) as e:
|
|
|
+ time.sleep(0.5)
|
|
|
+ print(e)
|
|
|
+ res = b'No response'
|
|
|
+ print(res)
|
|
|
+ return b'200 OK' in res and res.endswith(b'1')
|
|
|
+
|
|
|
+ def update_fw(self, fw_path, part=4096, callback=None):
|
|
|
+ """Update Mx firmware from file"""
|
|
|
+ if 'M3' in str(self.model):
|
|
|
+ return self.update_fw_m3(fw_path, part, callback)
|
|
|
+ else:
|
|
|
+ return self.update_fw_m2(fw_path, part, callback)
|
|
|
+
|
|
|
+ def set_system_variables(self, serial, mac, production_date_str):
|
|
|
+ """Set system variables from dictionary of strings for device through HTTP GET request"""
|
|
|
+ sysvars_dict = dict(serial=serial, mac=mac, proddate=production_date_str)
|
|
|
+ print(sysvars_dict)
|
|
|
+ try:
|
|
|
+ res = self._request('/service_set_sysvars.cgi?{}'.format(urlencode(sysvars_dict)), parse_json=False)
|
|
|
+ except DeviceM2ConnectionFail:
|
|
|
+ res = self._request('/service_set_sysvars.cgi?{}'.format(urlencode(sysvars_dict)), parse_json=False)
|
|
|
+ if b'200' in res:
|
|
|
+ print("return 200")
|
|
|
+ time.sleep(1)
|
|
|
+ a = self.wait_announce()
|
|
|
+ return a
|
|
|
+ else:
|
|
|
+ DeviceM2Error('Ошибка при установке системных переменных')
|
|
|
+
|
|
|
+ def set_status(self, status):
|
|
|
+ """Set system variables from dictionary of strings for device through HTTP GET request"""
|
|
|
+ sysvars_dict = dict(status_fail=status)
|
|
|
+ try:
|
|
|
+ self._request('/service_set_test_state.cgi?{}'.format(urlencode(sysvars_dict)), parse_json=False)
|
|
|
+ except DeviceM2Error:
|
|
|
+ pass
|
|
|
+ return self.wait_announce().status == status
|
|
|
+
|
|
|
+ def set_led(self, led, color, state, freq=None):
|
|
|
+ """Control M2 LEDs: server and status, color ::= r | g, state ::= on | off | blink"""
|
|
|
+ assert led in ('server', 'status')
|
|
|
+ assert color in ('r', 'g')
|
|
|
+ assert state in ('on', 'off', 'blink')
|
|
|
+ url = '/service_led.cgi?led={}&color={}&state={}'.format(led, color, state)
|
|
|
+ if freq is not None:
|
|
|
+ url += '&freq={}'.format(int(freq))
|
|
|
+ self._request(url, parse_json=False)
|
|
|
+
|
|
|
+ def reboot_to_iap(self):
|
|
|
+ """Reboot device to IAP mode using GET request (for M3 only now)"""
|
|
|
+ return self._request('/goboot.cgi', no_response=True)
|
|
|
+
|
|
|
+ def check_iap(self):
|
|
|
+ """Check if M2 device is in IAP mode (for M3 only now)"""
|
|
|
+ return b'url=/upload.html' in self._request('/', parse_json=False)
|
|
|
+
|
|
|
+ def get_info(self):
|
|
|
+ """Get info JSON from WEB interface"""
|
|
|
+ return self._request('/info.cgi', add_timestamp=True)
|
|
|
+
|
|
|
+ def get_p_info(self):
|
|
|
+ """Get info for long test from special CGI page"""
|
|
|
+ return self._request('/progon.cgi')
|
|
|
+
|
|
|
+ def get_settings(self):
|
|
|
+ """Get settings JSON from WEB interface"""
|
|
|
+ return self._request('/settings.cgi', add_timestamp=True)
|
|
|
+
|
|
|
+ def confirm(self):
|
|
|
+ """Send reboot CGI request, do not wait response"""
|
|
|
+ self._request('/confirm.cgi', add_timestamp=True, no_response=False, parse_json=False)
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _rectify_settings(settings):
|
|
|
+ """Convert settings JSON from tree to plain structure"""
|
|
|
+ up = dict()
|
|
|
+ for i, obj in enumerate(settings.pop('gsm')):
|
|
|
+ for k, v in obj.items():
|
|
|
+ up['gsm{}_{}'.format(i + 1, k[4:])] = v
|
|
|
+ for i, obj in enumerate(settings.pop('pgw')):
|
|
|
+ for k, v in obj.items():
|
|
|
+ up['pgw{}_{}'.format(i + 1, k[4:])] = v
|
|
|
+ settings.update(up)
|
|
|
+ return settings
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _to_http_form_value(value):
|
|
|
+ """Convert value to HTTP form format"""
|
|
|
+ if isinstance(value, bool):
|
|
|
+ return 'on' if value else ''
|
|
|
+ elif isinstance(value, float):
|
|
|
+ return str(int(value))
|
|
|
+ else:
|
|
|
+ return str(value)
|
|
|
+
|
|
|
+ def _set_settings(self, url, no_response, settings_dict):
|
|
|
+ """Common function for changing M2 settings using HTTP form POSTing"""
|
|
|
+ settings = self.get_settings()
|
|
|
+ r_settings = self._rectify_settings(settings)
|
|
|
+ new_settings = OrderedDict()
|
|
|
+ new_settings['cursor'] = '0'
|
|
|
+ new_settings['password'] = ''
|
|
|
+ for k in self.SETTINGS_FORM_KEYS:
|
|
|
+ if k in r_settings:
|
|
|
+ new_settings[k] = self._to_http_form_value(r_settings[k])
|
|
|
+ for k, v in settings_dict.items():
|
|
|
+ if v is None:
|
|
|
+ if k in new_settings:
|
|
|
+ new_settings.pop(k)
|
|
|
+ else:
|
|
|
+ new_settings[k] = v
|
|
|
+ return self._request(url, 'POST', urlencode(new_settings), False, False, no_response)
|
|
|
+
|
|
|
+ def set_settings(self, **kwargs):
|
|
|
+ """Set ordinary settings for M2"""
|
|
|
+ return self._set_settings('/settings.cgi', False, kwargs)
|
|
|
+
|
|
|
+ def set_settings_reboot(self, **kwargs):
|
|
|
+ """Set settings causes reboot for M2 - do not wait response"""
|
|
|
+ return self._set_settings('/settings.cgi', True, kwargs)
|
|
|
+
|
|
|
+ def set_settings_autoconfirm(self, **kwargs):
|
|
|
+ """Set settings with auto-confirmation for M2, use service URL"""
|
|
|
+ return self._set_settings('/settings_service.cgi', False, kwargs)
|
|
|
+
|
|
|
+ def reset_settings(self):
|
|
|
+ """Reset device settings to default"""
|
|
|
+ self._request('/reset.cgi', parse_json=False, add_timestamp=True)
|
|
|
+
|
|
|
+
|
|
|
+def main(ip: str, path: str):
|
|
|
+ #d = DeviceM2('178.176.41.47')
|
|
|
+ #d = DeviceM2('192.168.10.254')
|
|
|
+ d = DeviceM2(ip)
|
|
|
+ d.model = 'M3'
|
|
|
+ d.login()
|
|
|
+ # d.set_led('status', 'g', 'on')
|
|
|
+
|
|
|
+
|
|
|
+ # print(d.get_info())
|
|
|
+ print(d.set_system_variables('7347382', 'EC-4C-4D-02-0E-D6', '05.07.2023+17:41'))
|
|
|
+ # l = d.login()
|
|
|
+ # print(f'Login - {l}')
|
|
|
+ # d.update_fw_m3(path)
|
|
|
+ # time.sleep(10)
|
|
|
+
|
|
|
+"""
|
|
|
+{'uptime': '0 дн. 0 ч. 4 мин.', 'comment': '', 'incharge': '', 'digit_id': 0.0, 'fwversion': '2.014', 'iapversion': '1.03', 'owner': '', 'macaddr': 'EC-4C-4D-02-0E-D6', 'model': 'Метролог M3', 'mboard_rev': '7', 'dboard_rev': '7', 'mfr': 'АО "НПК РоТеК"', 'sysLocation': '', 'prodate': '06.07.2023+17:41', 'serno': '7347382', 'netsettings_changed': False, 'utm': 1760983071.0}
|
|
|
+"""
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ main("192.168.1.101", "fw.bin")
|
|
|
+ # if len(sys.argv) != 3:
|
|
|
+ # print(f'Usage: {sys.argv[0]} ip fw_binary_path')
|
|
|
+ # sys.exit(2)
|
|
|
+ # sys.exit(main(sys.argv[1], sys.argv[2]))
|