#!/usr/bin/python import sys, getopt, time import debug, json, settings from datetime import datetime from transport import PortgwSerial from transport import PortgwTcp def usage(): print "Usage:\r\n\ --ip=
\t\tIP adress to connect to\r\n\ --port= \tPort number to connect to\r\n\ --serial= \tSerial port to use\r\n\ --baud= \tbaudrate\r\n\ --bytesize= \tbyte size in bits\r\n\ --parity=['N'|'O'|'E'] \tparity: none, odd, even\r\n\ --stopbits= \tnumber of stop bits\r\n\ --all \t\t\tcheck all UART modes\r\n\ --help \t\t\tprint usage info\r\n\ --debug \t\tenable debug output" def print_bitrate(datalen, duration): print "%.1f sec, %d bps" %(duration, datalen * 8 / duration) def test_src2dst(src, dst, data): print "%s -> %s (%d bytes)" %(src.name, dst.name, len(data)) if not src.open(): print "%s open failed" %(src.name) sys.exit() if not dst.open(): print "%s open failed" %(dst.name) sys.exit() txdata = data debug.print_debug("%s TX (%d bytes): %s" %(src.name, len(txdata), txdata[:10] + "...")) timestamp = time.time() res = src.write(txdata) if res <= 0: print "%s write fail" %(src.name) sys.exit() rxdata = dst.read(len(txdata), 1) if res == None: print "%s read fail" %(dst.name) sys.exit() debug.print_debug("%s RX (%d bytes): %s" %(dst.name, len(rxdata), rxdata[:10] + "...")) if rxdata != txdata: debug.print_data("TX:\n", txdata) debug.print_data("RX:\n", rxdata) print "Test failed:" sys.exit() print_bitrate(len(data), time.time() - timestamp) print "OK\n" dst.close() src.close() def test_echo(src, dst, data): print "%s -> %s -> %s -> %s (%d bytes)" %(src.name, dst.name, dst.name, src.name, len(data)) if not src.open(): print "TCP open failed" sys.exit() if not dst.open(): print "UART open failed" sys.exit() txdata = data debug.print_debug("%s TX (%d bytes): %s" %(src.name, len(txdata), txdata[:10] + "...")) timestamp = time.time() res = src.write(txdata) if res <= 0: print "Write fail" sys.exit() echo = dst.read(len(txdata)) if res == None: print "Read fail" sys.exit() debug.print_debug("%s ECHO (%d bytes): %s" %(dst.name, len(echo), echo[:10] + "...")) res = dst.write(echo) if res <= 0: print "Write fail" sys.exit() rxdata = src.read(len(echo)) if res == None: print "Read fail" sys.exit() debug.print_debug("%s RX (%d bytes): %s" %(src.name, len(rxdata), rxdata[:10] + "...")) if rxdata != txdata: print "Fail" sys.exit() #Double data length (echo) print_bitrate(len(data) * 2, time.time() - timestamp) print "OK\n" src.close() dst.close() def test_mult_requests(src, dst, data, duration): print "%s -> %s -> %s -> %s (%d bytes, multi: %d sec)" %(src.name, dst.name, dst.name, src.name, len (data), duration) if not src.open(): print "TCP open failed" sys.exit() if not dst.open(): print "UART open failed" sys.exit() txdata = data req_count = 0 timestamp = time.time() while time.time() - timestamp < duration: print "\rDuration: %.1f/%.1f sec, request: %d" % (time.time() - timestamp, duration, req_count + 1), sys.stdout.flush() debug.print_debug("%s TX (%d bytes): %s" %(src.name, len(txdata), txdata[:10] + "...")) res = src.write(txdata) if res <= 0: print "Write fail" sys.exit() echo = dst.read(len(txdata)) if res == None: print "Read fail" sys.exit() debug.print_debug("%s ECHO (%d bytes): %s" %(dst.name, len(echo), echo[:10] + "...")) res = dst.write(echo) if res <= 0: print "Write fail" sys.exit() rxdata = src.read(len(echo)) if res == None: print "Read fail" sys.exit() debug.print_debug("%s RX (%d bytes): %s\n" %(src.name, len(rxdata), rxdata[:10] + "...")) if rxdata != txdata: print "Fail" sys.exit() req_count += 1 print "\nTest duration: %d sec, %d requests completed" %(duration, req_count) print "OK\n" src.close() dst.close() def test_suite(uart, tcp, ip, session, baudrate, bytesize, parity, stopbits): #Test data test_packet = "AABBCCDD"*10 #Configure hosts's UART uart.close() uart.baudrate = baudrate uart.bytesize = bytesize uart.parity = parity uart.stopbits = stopbits uart.open() # Configure device's UART if not settings.change(ip, session, baudrate, bytesize, parity, stopbits): print "UART settings set fail" sys.exit(2) else: print "Settings: %d %d%s%d" %(baudrate, bytesize, parity, stopbits) # UART -> TCP print "-"*10 test_src2dst(uart, tcp, test_packet) # TCP -> UART print "-"*10 test_src2dst(tcp, uart, test_packet) # TCP echo: TCP -> UART -> UART -> TCP print "-"*10 test_echo(tcp, uart, test_packet) # UART echo: UART -> TCP -> TCP -> UART print "-"*10 test_echo(uart, tcp, test_packet) # TCP -> UART -> UART -> TCP (multiple requests) print "-"*10 test_mult_requests(tcp, uart, "AABBCCDD"*16, 5) # UART -> TCP -> TCP -> UART (multiple requests) print "-"*10 test_mult_requests(uart, tcp, "AABBCCDD"*16, 5) def main(): check_all = False ip = None port = 1001 serial = '/dev/ttyUSB0' baudrate = 115200 bytesize = 8 parity = 'N' stopbits = 1 try: opts, args = getopt.getopt(sys.argv[1:], "hs:i:p:b:z:y:t:da", \ ["help", "serial=", "ip=", "port=", "baud=", "bytesize=", "parity=", "stopbits=", "debug", "all"]) except getopt.GetoptError: usage() sys.exit(2) for opt, arg in opts: if opt in ("-h", "--help"): usage() sys.exit() if opt in ("-s", "--serial"): serial = arg if opt in ("-b", "--baud"): if (arg.isdigit()): baudrate = int(arg) else: usage() sys.exit() if opt in ("-z", "--bytesize"): if (arg.isdigit() and arg in ('7', '8')): bytesize = int(arg) else: print arg usage() sys.exit() if opt in ("-y", "--parity"): if (arg in ('N', 'O', 'E')): parity = arg else: usage() sys.exit() if opt in ("-t", "--stopbits"): if (arg in ('1', '2')): stopbits = int(arg) else: usage() sys.exit() if opt in ("-i", "--ip"): ip = arg if opt in ("-p", "--port"): if (arg.isdigit()): port = int(arg) if opt in ("-d", "--debug"): debug.DEBUG = True if opt in ("-a", "--all"): check_all = True if ip == None: usage() sys.exit(2) tcp = PortgwTcp(ip, port) uart = PortgwSerial(serial, baudrate, bytesize, parity, stopbits) report = [] session = settings.login(ip) if session == None: sys.exit(2) if check_all: for baudrate in (1200, 2400, 4800, 9600, 19200, 38400, 57600, 115200): for bytesize in (7, 8): for parity in ('N', 'O', 'E'): #Skip unsupported modes if bytesize == 7 and parity == 'N': continue if stopbits in (1, 2): #Run tests test_suite(uart, tcp, ip, session, baudrate, bytesize, parity, stopbits) report.append(str(baudrate) + " " + str(bytesize) + parity + str(stopbits)) else: test_suite(uart, tcp, ip, session, baudrate, bytesize, parity, stopbits) report.append(str(baudrate) + " " + str(bytesize) + parity + str(stopbits)) print "Checked:" for i in report: print i print "\rAll done" if __name__ == '__main__': main()