123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423 |
- #!/usr/bin/env python3
- """Base classes for MT-M2 modems support"""
- __author__ = "Andrew Kolyaskin"
- __email__ = "a.kolyaskin@labinsys.ru"
- __date__ = "04.04.2019"
- __status__ = "testing"
- import os
- import json
- import sys
- import time
- import zlib
- import socket
- import subprocess
- from datetime import datetime
- from urllib.parse import urlencode
- from collections import OrderedDict
- class AnnounceM2:
- """Container and parser for M2 UDP announce data"""
- # '{model};{serial};{mac};{sw_ver};{button_set};{button_mode};{stm32_id};;{production_date};{status};{button_box};'
- SEP = ';'
- TRUE_S = 'true'
- __slots__ = ('model', 'serial', 'mac', 'sw_ver', 'button_set',
- 'button_mode', 'stm32_id', 'production_date', 'status', 'button_box')
- def __init__(self, model, serial, mac, sw_ver, button_set, button_mode, stm32_id, prod_date, status, button_box):
- self.model, self.serial, self.mac, self.sw_ver = model, serial, mac, sw_ver
- self.button_set, self.button_mode, self.stm32_id = button_set, button_mode, stm32_id
- self.production_date, self.status, self.button_box = prod_date, status, button_box
- @classmethod
- def parse(cls, raw):
- """Parse raw bytes from received datagram"""
- if not isinstance(raw, str):
- try:
- raw = raw.decode()
- except Exception as e:
- return
- sp = raw.split(cls.SEP)
- if len(sp) != 12:
- return None
- true = cls.TRUE_S
- return cls(sp[0], sp[1], sp[2], sp[3], sp[4] == true, sp[5] == true, sp[6], sp[8], sp[9], sp[10] == true)
- class DeviceM2Error(Exception):
- """Exception class for M2 device"""
- class DeviceM2AuthFail(DeviceM2Error):
- """This exception raised on possible auth fail (example: got auth page instead of JSON)"""
- class DeviceM2ConnectionFail(DeviceM2Error):
- """This exception raised on socket connecting fail"""
- class DeviceM2:
- """Metrolog-M2 board service HTTP client"""
- TRY_COUNT = 3
- TIMEOUT = 5
- WAIT_TIME = 3
- HTTP_PORT = 80
- ANNOUNCE_PORT = 49049
- T1READY_STATUS = 'T0OK'
- MODEL = 'Метролог M2'
- SETTINGS_FORM_KEYS = ('cursor', 'eth_ena', 'eth_prior', 'eth_ip_test', 'gsm1_prior', 'gsm1_ip_test', 'gsm2_prior',
- 'gsm2_ip_test', 'ipaddr', 'gw', 'mask', 'gsm1_ena', 'gsm1_profile', 'gsm1_apn', 'gsm1_login',
- 'gsm1_passw', 'gsm2_ena', 'gsm2_profile', 'gsm2_apn', 'gsm2_login', 'gsm2_passw', 'srv_ip',
- 'srv_port', 'pgw1_en', 'pgw1_dscr', 'pgw1_rs', 'pgw1_baud', 'pgw1_par', 'pgw1_ndata',
- 'pgw1_nstop', 'pgw1_mode', 'pgw1_trans', 'pgw1_port', 'pgw2_dscr', 'pgw2_rs', 'pgw2_baud',
- 'pgw2_par', 'pgw2_ndata', 'pgw2_nstop', 'pgw2_mode', 'pgw2_trans', 'pgw2_port', 'pgw3_dscr',
- 'pgw3_rs', 'pgw3_baud', 'pgw3_par', 'pgw3_ndata', 'pgw3_nstop', 'pgw3_mode', 'pgw3_trans',
- 'pgw3_port', 'ntpservip1', 'ntpservip2', 'password', 'utc', 'ntp', 'date', 'time')
- def __init__(self, ip, params=None):
- self.ip = ip
- if isinstance(params, AnnounceM2):
- self.model, self.serial, self.mac = params.model, params.serial, params.mac
- self.sw_ver, self.stm32_id, self.status = params.sw_ver, params.stm32_id, params.status
- else:
- self.model = self.serial = self.mac = self.sw_ver = self.stm32_id = self.status = None
- self.cookies = ''
- def ping(self):
- """Test connection with device using ping"""
- null = open(os.devnull, 'wb')
- try:
- subprocess.check_call(['ping', '-c', '5', '-i', '.8', '-w', '15', '-W', '1', self.ip],
- timeout=16, stdout=null, stderr=null)
- return True
- except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
- return False
- def _request(self, path, method='GET', body='', parse_json=True, add_timestamp=False, no_response=False, timeout=0):
- """Do HTTP request using raw TCP"""
- sock = socket.socket()
- sock.settimeout(timeout if timeout else self.TIMEOUT)
- try:
- sock.connect((self.ip, self.HTTP_PORT))
- if add_timestamp:
- path = '{}?_={}'.format(path, int(time.time() * 1000))
- req = '{} {} HTTP/1.1\r\nContent-Length: {}\r\nCookies: {}\r\n\r\n{}'.format(
- method, path, len(body), self.cookies, body).encode()
- sock.sendall(req)
- time.sleep(self.WAIT_TIME)
- res = None if no_response else sock.recv(32768)
- except socket.timeout:
- raise DeviceM2Error('Тайм-аут связи с тестируемым устройством')
- except Exception as e:
- print(e)
- raise DeviceM2ConnectionFail('Ошибка связи с тестируемым устройством')
- else:
- if parse_json and not no_response:
- try:
- body = res.split(b'\r\n\r\n')[1]
- return json.loads(body.decode())
- except UnicodeDecodeError:
- try:
- html = zlib.decompress(body, 16 + zlib.MAX_WBITS).decode()
- if 'action="login.cgi"' in html and '<h1>Авторизация</h1>' in html:
- raise DeviceM2AuthFail('Ошибка авторизации')
- except (zlib.error, UnicodeDecodeError):
- raise DeviceM2Error('Некорректный JSON-ответ')
- except (json.JSONDecodeError, IndexError):
- raise DeviceM2Error('Некорректный JSON-ответ')
- else:
- return res
- @staticmethod
- def announce_socket():
- """Create socket to receive announce messages"""
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.settimeout(.5)
- sock.bind(('0.0.0.0', DeviceM2.ANNOUNCE_PORT))
- return sock
- @staticmethod
- def parse_uptime_str(uptime_str):
- """Parse ru uptime string, return result in seconds '0 дн. 0 ч. 5 мин.'"""
- d, h, m = map(int, uptime_str.split()[::2])
- return 86400 * d + 3600 * h + 60 * m
- def wait_announce(self, timeout=None, get_ip=False):
- """Wait next UDP announce from device"""
- if timeout is None:
- timeout = self.TIMEOUT
- sock = self.announce_socket()
- start = datetime.now()
- while True:
- try:
- raw, address = sock.recvfrom(4096)
- except socket.timeout:
- pass
- else:
- announce = AnnounceM2.parse(raw)
- if announce is not None:
- if announce.stm32_id == self.stm32_id:
- return (address, announce) if get_ip else announce
- finally:
- if (datetime.now() - start).seconds > timeout:
- raise DeviceM2Error('Превышено время ожидания анонса')
- def login(self):
- """Log in to web interface of device"""
- if 'M3' in str(self.model):
- login, password = 'user', 'uchetmo'
- else:
- login, password = 'admin', '12345'
- cookie_mark = b'Set-Cookie: '
- res = self._request('/login.cgi', 'POST', f'login={login}&password={password}', False)
- head = res.split(b'\r\n\r\n')[0]
- self.cookies = b'; '.join(i[len(cookie_mark):] for i in filter(lambda x: x.startswith(cookie_mark),
- head.split(b'\r\n'))).decode()
- return bool(self.cookies)
- def dump_cookies(self, path):
- """Save cookies to file"""
- with open(path, 'w') as f:
- f.write(self.cookies)
- def load_cookies(self, path):
- """Load cookies from file"""
- with open(path) as f:
- self.cookies = f.read()
- def reboot(self):
- """Send reboot CGI request, do not wait response"""
- self._request('/reboot.cgi', add_timestamp=True, no_response=True)
- def update_fw_m2(self, fw_path, part=4096, callback=None):
- """Update M2 firmware from file"""
- boundary = '---------------------------69299438174861'
- data = open(fw_path, 'rb').read()
- sock = socket.socket()
- sock.connect((self.ip, self.HTTP_PORT))
- body = b''.join((b'-----------------------------69299438174861\r\n'
- b'Content-Disposition: form-data; name="file1"; filename="MT_M01.bin"\r\n'
- b'Content-Type: application/octet-stream\r\n\r\n', data,
- b'\r\n-----------------------------69299438174861--\r\n'))
- req = 'POST /upload.cgi HTTP/1.1\r\nContent-Type: multipart/form-data; boundary={}\r\n' \
- 'Content-Length: {}\r\nCookies: {}\r\n\r\n'.format(boundary, len(body), self.cookies).encode()
- request = req + body
- length = len(request)
- for i in range(0, length, part):
- sock.sendall(request[i: i + part])
- if callable(callback):
- callback(i, part, length)
- res = sock.recv(4096)
- return b'200 OK' in res and res.endswith(b'1')
- def update_fw_m3(self, fw_path, part=800, callback=None):
- """Update M3 firmware from file"""
- boundary = '---------------------------69299438174861'
- data = open(fw_path, 'rb').read()
- sock = socket.socket()
- # sock.setsockopt(socket.SOL_TCP, socket.TCP_MAXSEG, 1300)
- sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
- sock.connect((self.ip, self.HTTP_PORT))
- sock.settimeout(1)
- body = b''.join((b'-----------------------------69299438174861\r\n'
- b'Content-Disposition: form-data; name="file1"; filename="MT_M03.bin"\r\n'
- b'Content-Type: application/octet-stream\r\n\r\n', data,
- b'\r\n-----------------------------69299438174861--\r\n'))
- req = 'POST /upload.cgi HTTP/1.1\r\nContent-Type: multipart/form-data; boundary={}\r\n' \
- 'Content-Length: {}\r\nCookies: {}\r\n\r\n'.format(boundary, len(body), self.cookies).encode()
- request = req + body
- length = len(request)
- for i in range(0, length, part):
- chunk = i + part
- if chunk > length:
- chunk = length
- offset = 0
- current_part_length = chunk - i
- while offset < current_part_length:
- try:
- r = sock.send(request[i + offset: chunk])
- except (BlockingIOError, socket.timeout):
- r = 0
- print(i)
- print(f'Sent {r} bytes (from {i + offset} to {chunk} of {length})')
- offset += r
- if offset < current_part_length:
- print('sleep on fail')
- time.sleep(2)
- else:
- print('sleep on ok')
- time.sleep(1)
- if callable(callback):
- callback(i, part, length)
- time.sleep(1)
- start_time = time.monotonic()
- res = b'No any data'
- print('Reading response...')
- while time.monotonic() - start_time < 120:
- try:
- print(f'Try recv...')
- res = sock.recv(4096)
- print(res)
- break
- except (BlockingIOError, socket.timeout) as e:
- time.sleep(0.5)
- print(e)
- res = b'No response'
- print(res)
- return b'200 OK' in res and res.endswith(b'1')
- def update_fw(self, fw_path, part=4096, callback=None):
- """Update Mx firmware from file"""
- if 'M3' in str(self.model):
- return self.update_fw_m3(fw_path, part, callback)
- else:
- return self.update_fw_m2(fw_path, part, callback)
- def set_system_variables(self, serial, mac, production_date_str):
- """Set system variables from dictionary of strings for device through HTTP GET request"""
- sysvars_dict = dict(serial=serial, mac=mac, proddate=production_date_str)
- print(sysvars_dict)
- try:
- res = self._request('/service_set_sysvars.cgi?{}'.format(urlencode(sysvars_dict)), parse_json=False)
- except DeviceM2ConnectionFail:
- res = self._request('/service_set_sysvars.cgi?{}'.format(urlencode(sysvars_dict)), parse_json=False)
- if b'200' in res:
- print("return 200")
- time.sleep(1)
- a = self.wait_announce()
- return a
- else:
- DeviceM2Error('Ошибка при установке системных переменных')
- def set_status(self, status):
- """Set system variables from dictionary of strings for device through HTTP GET request"""
- sysvars_dict = dict(status_fail=status)
- try:
- self._request('/service_set_test_state.cgi?{}'.format(urlencode(sysvars_dict)), parse_json=False)
- except DeviceM2Error:
- pass
- return self.wait_announce().status == status
- def set_led(self, led, color, state, freq=None):
- """Control M2 LEDs: server and status, color ::= r | g, state ::= on | off | blink"""
- assert led in ('server', 'status')
- assert color in ('r', 'g')
- assert state in ('on', 'off', 'blink')
- url = '/service_led.cgi?led={}&color={}&state={}'.format(led, color, state)
- if freq is not None:
- url += '&freq={}'.format(int(freq))
- self._request(url, parse_json=False)
- def reboot_to_iap(self):
- """Reboot device to IAP mode using GET request (for M3 only now)"""
- return self._request('/goboot.cgi', no_response=True)
- def check_iap(self):
- """Check if M2 device is in IAP mode (for M3 only now)"""
- return b'url=/upload.html' in self._request('/', parse_json=False)
- def get_info(self):
- """Get info JSON from WEB interface"""
- return self._request('/info.cgi', add_timestamp=True)
- def get_p_info(self):
- """Get info for long test from special CGI page"""
- return self._request('/progon.cgi')
- def get_settings(self):
- """Get settings JSON from WEB interface"""
- return self._request('/settings.cgi', add_timestamp=True)
- def confirm(self):
- """Send reboot CGI request, do not wait response"""
- self._request('/confirm.cgi', add_timestamp=True, no_response=False, parse_json=False)
- @staticmethod
- def _rectify_settings(settings):
- """Convert settings JSON from tree to plain structure"""
- up = dict()
- for i, obj in enumerate(settings.pop('gsm')):
- for k, v in obj.items():
- up['gsm{}_{}'.format(i + 1, k[4:])] = v
- for i, obj in enumerate(settings.pop('pgw')):
- for k, v in obj.items():
- up['pgw{}_{}'.format(i + 1, k[4:])] = v
- settings.update(up)
- return settings
- @staticmethod
- def _to_http_form_value(value):
- """Convert value to HTTP form format"""
- if isinstance(value, bool):
- return 'on' if value else ''
- elif isinstance(value, float):
- return str(int(value))
- else:
- return str(value)
- def _set_settings(self, url, no_response, settings_dict):
- """Common function for changing M2 settings using HTTP form POSTing"""
- settings = self.get_settings()
- r_settings = self._rectify_settings(settings)
- new_settings = OrderedDict()
- new_settings['cursor'] = '0'
- new_settings['password'] = ''
- for k in self.SETTINGS_FORM_KEYS:
- if k in r_settings:
- new_settings[k] = self._to_http_form_value(r_settings[k])
- for k, v in settings_dict.items():
- if v is None:
- if k in new_settings:
- new_settings.pop(k)
- else:
- new_settings[k] = v
- return self._request(url, 'POST', urlencode(new_settings), False, False, no_response)
- def set_settings(self, **kwargs):
- """Set ordinary settings for M2"""
- return self._set_settings('/settings.cgi', False, kwargs)
- def set_settings_reboot(self, **kwargs):
- """Set settings causes reboot for M2 - do not wait response"""
- return self._set_settings('/settings.cgi', True, kwargs)
- def set_settings_autoconfirm(self, **kwargs):
- """Set settings with auto-confirmation for M2, use service URL"""
- return self._set_settings('/settings_service.cgi', False, kwargs)
- def reset_settings(self):
- """Reset device settings to default"""
- self._request('/reset.cgi', parse_json=False, add_timestamp=True)
- def main(ip: str, path: str):
- #d = DeviceM2('178.176.41.47')
- #d = DeviceM2('192.168.10.254')
- d = DeviceM2(ip)
- d.model = 'M3'
- d.login()
- # d.set_led('status', 'g', 'on')
- # print(d.get_info())
- print(d.set_system_variables('7347382', 'EC-4C-4D-02-0E-D6', '05.07.2023+17:41'))
- # l = d.login()
- # print(f'Login - {l}')
- # d.update_fw_m3(path)
- # time.sleep(10)
- """
- {'uptime': '0 дн. 0 ч. 4 мин.', 'comment': '', 'incharge': '', 'digit_id': 0.0, 'fwversion': '2.014', 'iapversion': '1.03', 'owner': '', 'macaddr': 'EC-4C-4D-02-0E-D6', 'model': 'Метролог M3', 'mboard_rev': '7', 'dboard_rev': '7', 'mfr': 'АО "НПК РоТеК"', 'sysLocation': '', 'prodate': '06.07.2023+17:41', 'serno': '7347382', 'netsettings_changed': False, 'utm': 1760983071.0}
- """
- if __name__ == '__main__':
- main("192.168.1.101", "fw.bin")
- # if len(sys.argv) != 3:
- # print(f'Usage: {sys.argv[0]} ip fw_binary_path')
- # sys.exit(2)
- # sys.exit(main(sys.argv[1], sys.argv[2]))
|