|
@@ -0,0 +1,318 @@
|
|
|
+#!/usr/bin/python
|
|
|
+
|
|
|
+import sys, getopt, time
|
|
|
+import debug, json, settings
|
|
|
+from datetime import datetime
|
|
|
+from transport import PortgwSerial
|
|
|
+from transport import PortgwTcp
|
|
|
+
|
|
|
+
|
|
|
+def usage():
|
|
|
+ print "Usage:\r\n\
|
|
|
+ --ip=<address> \t\tIP adress to connect to\r\n\
|
|
|
+ --port=<number> \tPort number to connect to\r\n\
|
|
|
+ --serial=<device> \tSerial port to use\r\n\
|
|
|
+ --baud=<baudrate> \tbaudrate\r\n\
|
|
|
+ --bytesize=<number> \tbyte size in bits\r\n\
|
|
|
+ --parity=['N'|'O'|'E'] \tparity: none, odd, even\r\n\
|
|
|
+ --stopbits=<number> \tnumber of stop bits\r\n\
|
|
|
+ --all \t\t\tcheck all UART modes\r\n\
|
|
|
+ --help \t\t\tprint usage info\r\n\
|
|
|
+ --debug \t\tenable debug output"
|
|
|
+
|
|
|
+
|
|
|
+def print_bitrate(datalen, duration):
|
|
|
+ print "%.1f sec, %d bps" %(duration, datalen * 8 / duration)
|
|
|
+
|
|
|
+
|
|
|
+def test_src2dst(src, dst, data):
|
|
|
+ print "%s -> %s (%d bytes)" %(src.name, dst.name, len(data))
|
|
|
+
|
|
|
+ if not src.open():
|
|
|
+ print "%s open failed" %(src.name)
|
|
|
+ sys.exit()
|
|
|
+
|
|
|
+ if not dst.open():
|
|
|
+ print "%s open failed" %(dst.name)
|
|
|
+ sys.exit()
|
|
|
+
|
|
|
+ txdata = data
|
|
|
+ debug.print_debug("%s TX (%d bytes): %s" %(src.name, len(txdata), txdata[:10] + "..."))
|
|
|
+
|
|
|
+ timestamp = time.time()
|
|
|
+
|
|
|
+ res = src.write(txdata)
|
|
|
+ if res <= 0:
|
|
|
+ print "%s write fail" %(src.name)
|
|
|
+ sys.exit()
|
|
|
+
|
|
|
+ rxdata = dst.read(len(txdata), 1)
|
|
|
+ if res == None:
|
|
|
+ print "%s read fail" %(dst.name)
|
|
|
+ sys.exit()
|
|
|
+
|
|
|
+ debug.print_debug("%s RX (%d bytes): %s" %(dst.name, len(rxdata), rxdata[:10] + "..."))
|
|
|
+ if rxdata != txdata:
|
|
|
+ debug.print_data("TX:\n", txdata)
|
|
|
+ debug.print_data("RX:\n", rxdata)
|
|
|
+
|
|
|
+ print "Test failed:"
|
|
|
+ sys.exit()
|
|
|
+
|
|
|
+ print_bitrate(len(data), time.time() - timestamp)
|
|
|
+
|
|
|
+ print "OK\n"
|
|
|
+ dst.close()
|
|
|
+ src.close()
|
|
|
+
|
|
|
+
|
|
|
+def test_echo(src, dst, data):
|
|
|
+ print "%s -> %s -> %s -> %s (%d bytes)" %(src.name, dst.name, dst.name, src.name, len(data))
|
|
|
+
|
|
|
+ if not src.open():
|
|
|
+ print "TCP open failed"
|
|
|
+ sys.exit()
|
|
|
+
|
|
|
+ if not dst.open():
|
|
|
+ print "UART open failed"
|
|
|
+ sys.exit()
|
|
|
+
|
|
|
+ txdata = data
|
|
|
+ debug.print_debug("%s TX (%d bytes): %s" %(src.name, len(txdata), txdata[:10] + "..."))
|
|
|
+
|
|
|
+ timestamp = time.time()
|
|
|
+
|
|
|
+ res = src.write(txdata)
|
|
|
+ if res <= 0:
|
|
|
+ print "Write fail"
|
|
|
+ sys.exit()
|
|
|
+
|
|
|
+ echo = dst.read(len(txdata))
|
|
|
+ if res == None:
|
|
|
+ print "Read fail"
|
|
|
+ sys.exit()
|
|
|
+
|
|
|
+ debug.print_debug("%s ECHO (%d bytes): %s" %(dst.name, len(echo), echo[:10] + "..."))
|
|
|
+ res = dst.write(echo)
|
|
|
+ if res <= 0:
|
|
|
+ print "Write fail"
|
|
|
+ sys.exit()
|
|
|
+
|
|
|
+ rxdata = src.read(len(echo))
|
|
|
+ if res == None:
|
|
|
+ print "Read fail"
|
|
|
+ sys.exit()
|
|
|
+ debug.print_debug("%s RX (%d bytes): %s" %(src.name, len(rxdata), rxdata[:10] + "..."))
|
|
|
+
|
|
|
+ if rxdata != txdata:
|
|
|
+ print "Fail"
|
|
|
+ sys.exit()
|
|
|
+
|
|
|
+ #Double data length (echo)
|
|
|
+ print_bitrate(len(data) * 2, time.time() - timestamp)
|
|
|
+
|
|
|
+ print "OK\n"
|
|
|
+ src.close()
|
|
|
+ dst.close()
|
|
|
+
|
|
|
+
|
|
|
+def test_mult_requests(src, dst, data, duration):
|
|
|
+ print "%s -> %s -> %s -> %s (%d bytes, multi: %d sec)" %(src.name, dst.name, dst.name, src.name,
|
|
|
+ len (data), duration)
|
|
|
+
|
|
|
+ if not src.open():
|
|
|
+ print "TCP open failed"
|
|
|
+ sys.exit()
|
|
|
+
|
|
|
+ if not dst.open():
|
|
|
+ print "UART open failed"
|
|
|
+ sys.exit()
|
|
|
+
|
|
|
+ txdata = data
|
|
|
+ req_count = 0
|
|
|
+ timestamp = time.time()
|
|
|
+
|
|
|
+ while time.time() - timestamp < duration:
|
|
|
+ print "\rDuration: %.1f/%.1f sec, request: %d" % (time.time() - timestamp, duration, req_count + 1),
|
|
|
+ sys.stdout.flush()
|
|
|
+ debug.print_debug("%s TX (%d bytes): %s" %(src.name, len(txdata), txdata[:10] + "..."))
|
|
|
+ res = src.write(txdata)
|
|
|
+ if res <= 0:
|
|
|
+ print "Write fail"
|
|
|
+ sys.exit()
|
|
|
+
|
|
|
+ echo = dst.read(len(txdata))
|
|
|
+ if res == None:
|
|
|
+ print "Read fail"
|
|
|
+ sys.exit()
|
|
|
+
|
|
|
+ debug.print_debug("%s ECHO (%d bytes): %s" %(dst.name, len(echo), echo[:10] + "..."))
|
|
|
+ res = dst.write(echo)
|
|
|
+ if res <= 0:
|
|
|
+ print "Write fail"
|
|
|
+ sys.exit()
|
|
|
+
|
|
|
+ rxdata = src.read(len(echo))
|
|
|
+ if res == None:
|
|
|
+ print "Read fail"
|
|
|
+ sys.exit()
|
|
|
+ debug.print_debug("%s RX (%d bytes): %s\n" %(src.name, len(rxdata), rxdata[:10] + "..."))
|
|
|
+
|
|
|
+ if rxdata != txdata:
|
|
|
+ print "Fail"
|
|
|
+ sys.exit()
|
|
|
+
|
|
|
+ req_count += 1
|
|
|
+
|
|
|
+ print "\nTest duration: %d sec, %d requests completed" %(duration, req_count)
|
|
|
+ print "OK\n"
|
|
|
+ src.close()
|
|
|
+ dst.close()
|
|
|
+
|
|
|
+
|
|
|
+def test_suite(uart, tcp, ip, session, baudrate, bytesize, parity, stopbits):
|
|
|
+ #Test data
|
|
|
+ test_packet = "AABBCCDD"*10
|
|
|
+
|
|
|
+ #Configure hosts's UART
|
|
|
+ uart.close()
|
|
|
+ uart.baudrate = baudrate
|
|
|
+ uart.bytesize = bytesize
|
|
|
+ uart.parity = parity
|
|
|
+ uart.stopbits = stopbits
|
|
|
+ uart.open()
|
|
|
+
|
|
|
+ # Configure device's UART
|
|
|
+ if not settings.change(ip, session, baudrate, bytesize, parity, stopbits):
|
|
|
+ print "UART settings set fail"
|
|
|
+ sys.exit(2)
|
|
|
+ else:
|
|
|
+ print "Settings: %d %d%s%d" %(baudrate, bytesize, parity, stopbits)
|
|
|
+
|
|
|
+
|
|
|
+ # UART -> TCP
|
|
|
+ print "-"*10
|
|
|
+ test_src2dst(uart, tcp, test_packet)
|
|
|
+
|
|
|
+
|
|
|
+ # TCP -> UART
|
|
|
+ print "-"*10
|
|
|
+ test_src2dst(tcp, uart, test_packet)
|
|
|
+
|
|
|
+
|
|
|
+ # TCP echo: TCP -> UART -> UART -> TCP
|
|
|
+ print "-"*10
|
|
|
+ test_echo(tcp, uart, test_packet)
|
|
|
+
|
|
|
+
|
|
|
+ # UART echo: UART -> TCP -> TCP -> UART
|
|
|
+ print "-"*10
|
|
|
+ test_echo(uart, tcp, test_packet)
|
|
|
+
|
|
|
+
|
|
|
+ # TCP -> UART -> UART -> TCP (multiple requests)
|
|
|
+ print "-"*10
|
|
|
+ test_mult_requests(tcp, uart, "AABBCCDD"*16, 5)
|
|
|
+
|
|
|
+
|
|
|
+ # UART -> TCP -> TCP -> UART (multiple requests)
|
|
|
+ print "-"*10
|
|
|
+ test_mult_requests(uart, tcp, "AABBCCDD"*16, 5)
|
|
|
+
|
|
|
+
|
|
|
+def main():
|
|
|
+ check_all = False
|
|
|
+ ip = None
|
|
|
+ port = 1001
|
|
|
+ serial = '/dev/ttyUSB0'
|
|
|
+ baudrate = 115200
|
|
|
+ bytesize = 8
|
|
|
+ parity = 'N'
|
|
|
+ stopbits = 1
|
|
|
+
|
|
|
+ try:
|
|
|
+ opts, args = getopt.getopt(sys.argv[1:], "hs:i:p:b:z:y:t:da", \
|
|
|
+ ["help", "serial=", "ip=", "port=", "baud=", "bytesize=",
|
|
|
+ "parity=", "stopbits=", "debug", "all"])
|
|
|
+ except getopt.GetoptError:
|
|
|
+ usage()
|
|
|
+ sys.exit(2)
|
|
|
+
|
|
|
+ for opt, arg in opts:
|
|
|
+ if opt in ("-h", "--help"):
|
|
|
+ usage()
|
|
|
+ sys.exit()
|
|
|
+ if opt in ("-s", "--serial"):
|
|
|
+ serial = arg
|
|
|
+ if opt in ("-b", "--baud"):
|
|
|
+ if (arg.isdigit()):
|
|
|
+ baudrate = int(arg)
|
|
|
+ else:
|
|
|
+ usage()
|
|
|
+ sys.exit()
|
|
|
+ if opt in ("-z", "--bytesize"):
|
|
|
+ if (arg.isdigit() and arg in ('7', '8')):
|
|
|
+ bytesize = int(arg)
|
|
|
+ else:
|
|
|
+ print arg
|
|
|
+ usage()
|
|
|
+ sys.exit()
|
|
|
+ if opt in ("-y", "--parity"):
|
|
|
+ if (arg in ('N', 'O', 'E')):
|
|
|
+ parity = arg
|
|
|
+ else:
|
|
|
+ usage()
|
|
|
+ sys.exit()
|
|
|
+ if opt in ("-t", "--stopbits"):
|
|
|
+ if (arg in ('1', '2')):
|
|
|
+ stopbits = int(arg)
|
|
|
+ else:
|
|
|
+ usage()
|
|
|
+ sys.exit()
|
|
|
+ if opt in ("-i", "--ip"):
|
|
|
+ ip = arg
|
|
|
+ if opt in ("-p", "--port"):
|
|
|
+ if (arg.isdigit()):
|
|
|
+ port = int(arg)
|
|
|
+ if opt in ("-d", "--debug"):
|
|
|
+ debug.DEBUG = True
|
|
|
+ if opt in ("-a", "--all"):
|
|
|
+ check_all = True
|
|
|
+
|
|
|
+ if ip == None:
|
|
|
+ usage()
|
|
|
+ sys.exit(2)
|
|
|
+
|
|
|
+ tcp = PortgwTcp(ip, port)
|
|
|
+ uart = PortgwSerial(serial, baudrate, bytesize, parity, stopbits)
|
|
|
+
|
|
|
+ report = []
|
|
|
+
|
|
|
+ session = settings.login(ip)
|
|
|
+ if session == None:
|
|
|
+ sys.exit(2)
|
|
|
+
|
|
|
+ if check_all:
|
|
|
+ for baudrate in (1200, 2400, 4800, 9600, 19200, 38400, 57600, 115200):
|
|
|
+ for bytesize in (7, 8):
|
|
|
+ for parity in ('N', 'O', 'E'):
|
|
|
+ #Skip unsupported modes
|
|
|
+ if bytesize == 7 and parity == 'N':
|
|
|
+ continue
|
|
|
+ if stopbits in (1, 2):
|
|
|
+ #Run tests
|
|
|
+ test_suite(uart, tcp, ip, session, baudrate, bytesize, parity, stopbits)
|
|
|
+ report.append(str(baudrate) + " " + str(bytesize) + parity + str(stopbits))
|
|
|
+ else:
|
|
|
+ test_suite(uart, tcp, ip, session, baudrate, bytesize, parity, stopbits)
|
|
|
+ report.append(str(baudrate) + " " + str(bytesize) + parity + str(stopbits))
|
|
|
+
|
|
|
+ print "Checked:"
|
|
|
+ for i in report:
|
|
|
+ print i
|
|
|
+
|
|
|
+ print "\rAll done"
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ main()
|