#!/usr/bin/python # -*- coding: utf-8 -*- import os, random, sys, time, json, getopt, threading sys.path.append('../../stm32_prs/stm32_prs/branches/epu_tester') import extdevices import checklogfile reload(sys) sys.setdefaultencoding('utf8') login = "admin" password = "12345" def print_test_list(): print "\t\t1 - ON/OFF test\r\n\ \t2 - RAM buffer test\r\n\ \t3 - Log file download test" def usage(): print "Usage: test_log.py -t \r\n\ --help, -h \t\tprint usage info\r\n\ --test, -t \t\ttest number to run" print_test_list() def ping(hostname): response = os.system("ping -q -c1 -W1 " + hostname + ">> /dev/null") if response == 0: return True else: return False def do_login(): os.system("curl -s -b /tmp/cookies.txt -c /tmp/cookies.txt -d \"login=%s&password=%s\" http://%s/login.cgi > /dev/null" % (login, password, ip)) def do_logout(): os.system("curl -s -b /tmp/cookies.txt -c /tmp/cookies.txt http://%s/logout.cgi > /dev/null" %ip) def do_reset_sett(): os.system("curl -s -b /tmp/cookies.txt -c /tmp/cookies.txt \"http://%s/reset.cgi\" > /dev/null" %ip) def log_get_page(page): return os.popen("curl -s -b /tmp/cookies.txt -c /tmp/cookies.txt \"http://%s/history.cgi?page=%s&_=%s\" --compressed | jq -r -j '.page'" % (ip, page, random.randint(0, 0xFFFF))).read() def log_get_string(page, string): return os.popen("curl -s -b /tmp/cookies.txt -c /tmp/cookies.txt \"http://%s/history.cgi?page=%s&_=%s\" --compressed | jq -r -j '.page[%s]'" % (ip, page, random.randint(0, 0xFFFF), string)).read().decode('utf-8') def log_get_ramstrnum(): page = log_get_pagesnum() return os.popen("curl -s -b /tmp/cookies.txt -c /tmp/cookies.txt \"http://%s/history.cgi?page=%s&_=%s\" --compressed | jq -r -j '.page | length'" % (ip, page, random.randint(0, 0xFFFF))).read() def log_get_pagesnum(): return os.popen("curl -s -b /tmp/cookies.txt -c /tmp/cookies.txt \"http://%s/history.cgi?page=1&_=%s\" --compressed | jq -r -j '.pages'" % (ip, random.randint(0, 0xFFFF))).read() def log_download(path): print "Downloading log file..." if 0 == os.system("curl -b /tmp/cookies.txt -c /tmp/cookies.txt \"http://%s/history.cgi?page=all&_=%s\" --output %s --progress" % (ip, random.randint(0, 0xFFFF), path)): return True else: return False def log_add_message(): do_logout() do_login() time.sleep(1) def reboot(method): if method == "snmp": os.system("snmpset -v1 -t0.1 -r0 -c public %s .1.3.6.1.4.1.41752.5.17.2.9.1.0 i 1 > /dev/null" %ip) if method == "web": os.system("curl -s -b /tmp/cookies.txt -c /tmp/cookies.txt http://%s/reboot.cgi" %ip) else: print "Unknown method" def stop_spam_task(): res = os.popen("curl -s -b /tmp/cookies.txt -c /tmp/cookies.txt \"http://%s/led.cgi?state=%s&_=%s\" --compressed" % (ip, "off", random.randint(0, 0xFFFF))).read() if res == "OFF": print "Spam stoped" return True else: print "Error" return False def rambuf_test(): passed = False; results = [0]*10 if not ping(ip): print "Host not found" return False do_login() while len(filter(lambda x: x > 0, results)) < 10: while True: last_msg = log_get_string(1, 0) ram_num = int(log_get_ramstrnum()) if ram_num == 10: ram_num = 0; if results[ram_num] > 0: log_add_message() else: break print "\r\nLast message: ", last_msg print "Messages in RAM: %s" %ram_num reboot("web") time.sleep(10) do_login() for i in range(1, 10): page = log_get_page(i) print "\r\nPage " + str(i) + "\r\n" + page parsed_page = json.loads(page) if last_msg in parsed_page: str_num = parsed_page.index(last_msg) + 1 print "\r\nLast message found as %s-th message (page %s, string %s)" %(str_num + (i - 1) * 10 , i, str_num) results[ram_num] += 1 break if "Перезагрузка" in parsed_page: print "Error: Last message not found" return False print "\r\nResults:", results return True def on_off_test(): num = 0 #External device addr device = extdevices.io_device(num) msg = '' timestamp = '' res = False device.set_outputs(0x02) time.sleep(8) if not ping(ip): print "Host not found" return False while True: try: device.set_outputs(0x00) print "OFF" time.sleep(0.2) device.set_outputs(0x02) print "ON" time.sleep(4) time.sleep(random.randint(1, 4)) except KeyboardInterrupt: sys.exit(0) except ValueError: print "JSON parse error." stop_spam_task() return False except: print "Retry" stop_spam_task() return res def download_test_ram(start_num, add_num): print "RAM:", start_num, "+", add_num PATH = "/tmp/testlog_ram"+str(start_num)+"_"+str(add_num)+".csv" do_login() while True: last_msg = log_get_string(1, 0) ram_num = int(log_get_ramstrnum()) if ram_num == 10: ram_num = 0; if ram_num == start_num: break else: log_add_message() thr = threading.Thread(target=log_download, args=(PATH,), kwargs={}) thr.start() time.sleep(1) for i in range(add_num): log_add_message() thr.join() print "Last message:", last_msg fsize = os.path.getsize(PATH) print "File size: ", fsize if fsize != 120*30000 + 3 + 120*start_num: print "Error. File size missmatch" return False res = checklogfile.check(PATH) if not res: return False lines = checklogfile.parse(PATH) res = False for i in range(len(lines)): s = lines[i] if last_msg in s: res = True break if not res: print "Error. Last message not found" return False else: print "Last message found" do_logout() return True def download_test(): passed = False; results = [0]*10 if not ping(ip): print "Host not found" return False do_login() #Проверка добавления записи в лог во время загрузки файла (строка попала в RAM и вызвала запись страницы) if not download_test_ram(8, 5): return False #Проверка добавления записи в лог во время загрузки файла (строка попала в RAM, без записи страницы) if not download_test_ram(1, 2): return False #Проверка загрузки лога при количестве строк в RAM 0-9 do_login() while len(filter(lambda x: x > 0, results)) < 10: while True: last_msg = log_get_string(1, 0) ram_num = int(log_get_ramstrnum()) if ram_num == 10: ram_num = 0; if results[ram_num] > 0: log_add_message() else: break print "\r\nLast message: ", last_msg print "Messages in RAM: %s" %ram_num PATH = "/tmp/testlog_" + str(ram_num) + ".csv" thr = threading.Thread(target=log_download, args=(PATH,), kwargs={}) thr.start() thr.join() #log_download(PATH) fsize = os.path.getsize(PATH) print "File size: ", fsize res = checklogfile.check(PATH) if res: results[ram_num] += 1 else: return False lines = checklogfile.parse(PATH) res = False for i in range(len(lines)): s = lines[i] if last_msg in s: res = True break if not res: print "Error. Last message not found" return False else: print "Last message found" print "\r\nResults:", results return True test_list = [0, on_off_test, rambuf_test, download_test] def main(): testnum = 0 global ip try: opts, args = getopt.getopt(sys.argv[1:], "ht:", \ ["help", "test="]) except getopt.GetoptError: usage() sys.exit(2) for opt, arg in opts: if opt in ("-h", "--help"): usage() sys.exit() if opt in ("-t", "--test"): testnum = int(arg) if testnum == 0 or len(sys.argv) < 3: usage() sys.exit(2) ip = sys.argv[-1] #Run test by number if not test_list[testnum](): print "\r\nFailed\r\n" else: print "\r\nPassed\r\n" if __name__ == "__main__": main()