123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318 |
- #!/usr/bin/python
- import sys, getopt, time
- import debug, json, settings
- from datetime import datetime
- from transport import PortgwSerial
- from transport import PortgwTcp
- def usage():
- print "Usage:\r\n\
- --ip=<address> \t\tIP adress to connect to\r\n\
- --port=<number> \tPort number to connect to\r\n\
- --serial=<device> \tSerial port to use\r\n\
- --baud=<baudrate> \tbaudrate\r\n\
- --bytesize=<number> \tbyte size in bits\r\n\
- --parity=['N'|'O'|'E'] \tparity: none, odd, even\r\n\
- --stopbits=<number> \tnumber of stop bits\r\n\
- --all \t\t\tcheck all UART modes\r\n\
- --help \t\t\tprint usage info\r\n\
- --debug \t\tenable debug output"
- def print_bitrate(datalen, duration):
- print "%.1f sec, %d bps" %(duration, datalen * 8 / duration)
- def test_src2dst(src, dst, data):
- print "%s -> %s (%d bytes)" %(src.name, dst.name, len(data))
- if not src.open():
- print "%s open failed" %(src.name)
- sys.exit()
- if not dst.open():
- print "%s open failed" %(dst.name)
- sys.exit()
- txdata = data
- debug.print_debug("%s TX (%d bytes): %s" %(src.name, len(txdata), txdata[:10] + "..."))
- timestamp = time.time()
- res = src.write(txdata)
- if res <= 0:
- print "%s write fail" %(src.name)
- sys.exit()
- rxdata = dst.read(len(txdata), 1)
- if res == None:
- print "%s read fail" %(dst.name)
- sys.exit()
- debug.print_debug("%s RX (%d bytes): %s" %(dst.name, len(rxdata), rxdata[:10] + "..."))
- if rxdata != txdata:
- debug.print_data("TX:\n", txdata)
- debug.print_data("RX:\n", rxdata)
- print "Test failed:"
- sys.exit()
- print_bitrate(len(data), time.time() - timestamp)
- print "OK\n"
- dst.close()
- src.close()
- def test_echo(src, dst, data):
- print "%s -> %s -> %s -> %s (%d bytes)" %(src.name, dst.name, dst.name, src.name, len(data))
- if not src.open():
- print "TCP open failed"
- sys.exit()
- if not dst.open():
- print "UART open failed"
- sys.exit()
- txdata = data
- debug.print_debug("%s TX (%d bytes): %s" %(src.name, len(txdata), txdata[:10] + "..."))
- timestamp = time.time()
- res = src.write(txdata)
- if res <= 0:
- print "Write fail"
- sys.exit()
- echo = dst.read(len(txdata))
- if res == None:
- print "Read fail"
- sys.exit()
-
- debug.print_debug("%s ECHO (%d bytes): %s" %(dst.name, len(echo), echo[:10] + "..."))
- res = dst.write(echo)
- if res <= 0:
- print "Write fail"
- sys.exit()
- rxdata = src.read(len(echo))
- if res == None:
- print "Read fail"
- sys.exit()
- debug.print_debug("%s RX (%d bytes): %s" %(src.name, len(rxdata), rxdata[:10] + "..."))
- if rxdata != txdata:
- print "Fail"
- sys.exit()
- #Double data length (echo)
- print_bitrate(len(data) * 2, time.time() - timestamp)
- print "OK\n"
- src.close()
- dst.close()
- def test_mult_requests(src, dst, data, duration):
- print "%s -> %s -> %s -> %s (%d bytes, multi: %d sec)" %(src.name, dst.name, dst.name, src.name,
- len (data), duration)
- if not src.open():
- print "TCP open failed"
- sys.exit()
- if not dst.open():
- print "UART open failed"
- sys.exit()
- txdata = data
- req_count = 0
- timestamp = time.time()
- while time.time() - timestamp < duration:
- print "\rDuration: %.1f/%.1f sec, request: %d" % (time.time() - timestamp, duration, req_count + 1),
- sys.stdout.flush()
- debug.print_debug("%s TX (%d bytes): %s" %(src.name, len(txdata), txdata[:10] + "..."))
- res = src.write(txdata)
- if res <= 0:
- print "Write fail"
- sys.exit()
- echo = dst.read(len(txdata))
- if res == None:
- print "Read fail"
- sys.exit()
-
- debug.print_debug("%s ECHO (%d bytes): %s" %(dst.name, len(echo), echo[:10] + "..."))
- res = dst.write(echo)
- if res <= 0:
- print "Write fail"
- sys.exit()
- rxdata = src.read(len(echo))
- if res == None:
- print "Read fail"
- sys.exit()
- debug.print_debug("%s RX (%d bytes): %s\n" %(src.name, len(rxdata), rxdata[:10] + "..."))
- if rxdata != txdata:
- print "Fail"
- sys.exit()
- req_count += 1
- print "\nTest duration: %d sec, %d requests completed" %(duration, req_count)
- print "OK\n"
- src.close()
- dst.close()
- def test_suite(uart, tcp, ip, session, baudrate, bytesize, parity, stopbits):
- #Test data
- test_packet = "AABBCCDD"*10
- #Configure hosts's UART
- uart.close()
- uart.baudrate = baudrate
- uart.bytesize = bytesize
- uart.parity = parity
- uart.stopbits = stopbits
- uart.open()
- # Configure device's UART
- if not settings.change(ip, session, baudrate, bytesize, parity, stopbits):
- print "UART settings set fail"
- sys.exit(2)
- else:
- print "Settings: %d %d%s%d" %(baudrate, bytesize, parity, stopbits)
- # UART -> TCP
- print "-"*10
- test_src2dst(uart, tcp, test_packet)
- # TCP -> UART
- print "-"*10
- test_src2dst(tcp, uart, test_packet)
- # TCP echo: TCP -> UART -> UART -> TCP
- print "-"*10
- test_echo(tcp, uart, test_packet)
- # UART echo: UART -> TCP -> TCP -> UART
- print "-"*10
- test_echo(uart, tcp, test_packet)
- # TCP -> UART -> UART -> TCP (multiple requests)
- print "-"*10
- test_mult_requests(tcp, uart, "AABBCCDD"*16, 5)
- # UART -> TCP -> TCP -> UART (multiple requests)
- print "-"*10
- test_mult_requests(uart, tcp, "AABBCCDD"*16, 5)
- def main():
- check_all = False
- ip = None
- port = 1001
- serial = '/dev/ttyUSB0'
- baudrate = 115200
- bytesize = 8
- parity = 'N'
- stopbits = 1
- try:
- opts, args = getopt.getopt(sys.argv[1:], "hs:i:p:b:z:y:t:da", \
- ["help", "serial=", "ip=", "port=", "baud=", "bytesize=",
- "parity=", "stopbits=", "debug", "all"])
- except getopt.GetoptError:
- usage()
- sys.exit(2)
- for opt, arg in opts:
- if opt in ("-h", "--help"):
- usage()
- sys.exit()
- if opt in ("-s", "--serial"):
- serial = arg
- if opt in ("-b", "--baud"):
- if (arg.isdigit()):
- baudrate = int(arg)
- else:
- usage()
- sys.exit()
- if opt in ("-z", "--bytesize"):
- if (arg.isdigit() and arg in ('7', '8')):
- bytesize = int(arg)
- else:
- print arg
- usage()
- sys.exit()
- if opt in ("-y", "--parity"):
- if (arg in ('N', 'O', 'E')):
- parity = arg
- else:
- usage()
- sys.exit()
- if opt in ("-t", "--stopbits"):
- if (arg in ('1', '2')):
- stopbits = int(arg)
- else:
- usage()
- sys.exit()
- if opt in ("-i", "--ip"):
- ip = arg
- if opt in ("-p", "--port"):
- if (arg.isdigit()):
- port = int(arg)
- if opt in ("-d", "--debug"):
- debug.DEBUG = True
- if opt in ("-a", "--all"):
- check_all = True
- if ip == None:
- usage()
- sys.exit(2)
- tcp = PortgwTcp(ip, port)
- uart = PortgwSerial(serial, baudrate, bytesize, parity, stopbits)
- report = []
- session = settings.login(ip)
- if session == None:
- sys.exit(2)
- if check_all:
- for baudrate in (1200, 2400, 4800, 9600, 19200, 38400, 57600, 115200):
- for bytesize in (7, 8):
- for parity in ('N', 'O', 'E'):
- #Skip unsupported modes
- if bytesize == 7 and parity == 'N':
- continue
- if stopbits in (1, 2):
- #Run tests
- test_suite(uart, tcp, ip, session, baudrate, bytesize, parity, stopbits)
- report.append(str(baudrate) + " " + str(bytesize) + parity + str(stopbits))
- else:
- test_suite(uart, tcp, ip, session, baudrate, bytesize, parity, stopbits)
- report.append(str(baudrate) + " " + str(bytesize) + parity + str(stopbits))
- print "Checked:"
- for i in report:
- print i
- print "\rAll done"
- if __name__ == '__main__':
- main()
|