| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318 | #!/usr/bin/pythonimport sys, getopt, timeimport debug, json, settingsfrom datetime import datetimefrom transport import PortgwSerialfrom transport import PortgwTcpdef usage():	print "Usage:\r\n\	--ip=<address> \t\tIP adress to connect to\r\n\	--port=<number> \tPort number to connect to\r\n\	--serial=<device> \tSerial port to use\r\n\	--baud=<baudrate> \tbaudrate\r\n\	--bytesize=<number> \tbyte size in bits\r\n\	--parity=['N'|'O'|'E'] \tparity: none, odd, even\r\n\	--stopbits=<number> \tnumber of stop bits\r\n\	--all \t\t\tcheck all UART modes\r\n\	--help \t\t\tprint usage info\r\n\	--debug \t\tenable debug output"def print_bitrate(datalen, duration):	print "%.1f sec, %d bps" %(duration, datalen * 8 / duration)def test_src2dst(src, dst, data):	print "%s -> %s (%d bytes)" %(src.name, dst.name, len(data))	if not src.open():		print "%s open failed" %(src.name)		sys.exit()	if not dst.open():		print "%s open failed" %(dst.name)		sys.exit()	txdata = data	debug.print_debug("%s TX (%d bytes): %s" %(src.name, len(txdata), txdata[:10] + "..."))	timestamp = time.time()	res = src.write(txdata)	if res <= 0:		print "%s write fail" %(src.name)		sys.exit()	rxdata = dst.read(len(txdata), 1)	if res == None:		print "%s read fail" %(dst.name)		sys.exit()	debug.print_debug("%s RX (%d bytes): %s" %(dst.name, len(rxdata), rxdata[:10] + "..."))	if rxdata != txdata:		debug.print_data("TX:\n", txdata)		debug.print_data("RX:\n", rxdata)		print "Test failed:"		sys.exit()	print_bitrate(len(data), time.time() - timestamp)	print "OK\n"	dst.close()	src.close()def test_echo(src, dst, data):	print "%s -> %s -> %s -> %s (%d bytes)" %(src.name, dst.name, dst.name, src.name, len(data))	if not src.open():		print "TCP open failed"		sys.exit()	if not dst.open():		print "UART open failed"		sys.exit()	txdata = data	debug.print_debug("%s TX (%d bytes): %s" %(src.name, len(txdata), txdata[:10] + "..."))	timestamp = time.time()	res = src.write(txdata)	if res <= 0:		print "Write fail"		sys.exit()	echo = dst.read(len(txdata))	if res == None:		print "Read fail"		sys.exit()		debug.print_debug("%s ECHO (%d bytes): %s" %(dst.name, len(echo), echo[:10] + "..."))	res = dst.write(echo)	if res <= 0:		print "Write fail"		sys.exit()	rxdata = src.read(len(echo))	if res == None:		print "Read fail"		sys.exit()	debug.print_debug("%s RX (%d bytes): %s" %(src.name, len(rxdata), rxdata[:10] + "..."))	if rxdata != txdata:		print "Fail"		sys.exit()	#Double data length (echo)	print_bitrate(len(data) * 2, time.time() - timestamp)		print "OK\n"	src.close()	dst.close()def test_mult_requests(src, dst, data, duration):	print "%s -> %s -> %s -> %s (%d bytes, multi: %d sec)" %(src.name, dst.name, dst.name, src.name, 		len (data), duration)	if not src.open():		print "TCP open failed"		sys.exit()	if not dst.open():		print "UART open failed"		sys.exit()	txdata = data	req_count = 0	timestamp = time.time()	while time.time() - timestamp < duration:		print "\rDuration: %.1f/%.1f sec, request: %d" % (time.time() - timestamp, duration, req_count + 1),		sys.stdout.flush()		debug.print_debug("%s TX (%d bytes): %s" %(src.name, len(txdata), txdata[:10] + "..."))		res = src.write(txdata)		if res <= 0:			print "Write fail"			sys.exit()		echo = dst.read(len(txdata))		if res == None:			print "Read fail"			sys.exit()				debug.print_debug("%s ECHO (%d bytes): %s" %(dst.name, len(echo), echo[:10] + "..."))		res = dst.write(echo)		if res <= 0:			print "Write fail"			sys.exit()		rxdata = src.read(len(echo))		if res == None:			print "Read fail"			sys.exit()		debug.print_debug("%s RX (%d bytes): %s\n" %(src.name, len(rxdata), rxdata[:10] + "..."))		if rxdata != txdata:			print "Fail"			sys.exit()		req_count += 1	print "\nTest duration: %d sec, %d requests completed" %(duration, req_count)	print "OK\n"	src.close()	dst.close()def test_suite(uart, tcp, ip, session, baudrate, bytesize, parity, stopbits):	#Test data	test_packet = "AABBCCDD"*10	#Configure hosts's UART	uart.close()	uart.baudrate = baudrate	uart.bytesize = bytesize	uart.parity = parity	uart.stopbits = stopbits	uart.open()	# Configure device's UART	if not settings.change(ip, session, baudrate, bytesize, parity, stopbits):		print "UART settings set fail"		sys.exit(2)	else:		print "Settings: %d %d%s%d" %(baudrate, bytesize, parity, stopbits)	# UART -> TCP	print "-"*10	test_src2dst(uart, tcp, test_packet)	# TCP -> UART	print "-"*10	test_src2dst(tcp, uart, test_packet)	# TCP echo: TCP -> UART -> UART -> TCP	print "-"*10	test_echo(tcp, uart, test_packet)	# UART echo: UART -> TCP -> TCP -> UART	print "-"*10	test_echo(uart, tcp, test_packet)	# TCP -> UART -> UART -> TCP (multiple requests)	print "-"*10	test_mult_requests(tcp, uart, "AABBCCDD"*16, 5)	# UART -> TCP -> TCP -> UART (multiple requests)	print "-"*10	test_mult_requests(uart, tcp, "AABBCCDD"*16, 5)def main():	check_all = False	ip = None	port = 1001	serial = '/dev/ttyUSB0'	baudrate = 115200	bytesize = 8	parity = 'N'	stopbits = 1	try:		opts, args = getopt.getopt(sys.argv[1:], "hs:i:p:b:z:y:t:da", \			["help", "serial=", "ip=", "port=", "baud=", "bytesize=",			"parity=", "stopbits=", "debug", "all"])	except getopt.GetoptError:			usage()			sys.exit(2)	for opt, arg in opts:			if opt in ("-h", "--help"):				usage()				sys.exit()			if opt in ("-s", "--serial"): 				serial = arg			if opt in ("-b", "--baud"): 				if (arg.isdigit()):					baudrate = int(arg)				else:					usage()					sys.exit()			if opt in ("-z", "--bytesize"): 				if (arg.isdigit() and arg in ('7', '8')):					bytesize = int(arg)				else:					print arg					usage()					sys.exit()			if opt in ("-y", "--parity"): 				if (arg in ('N', 'O', 'E')):					parity = arg				else:					usage()					sys.exit()			if opt in ("-t", "--stopbits"): 				if (arg in ('1', '2')):					stopbits = int(arg)				else:					usage()					sys.exit()			if opt in ("-i", "--ip"): 				ip = arg			if opt in ("-p", "--port"):				if (arg.isdigit()):					port = int(arg)			if opt in ("-d", "--debug"):				debug.DEBUG = True			if opt in ("-a", "--all"):				check_all = True	if ip == None:		usage()		sys.exit(2)	tcp = PortgwTcp(ip, port)	uart = PortgwSerial(serial, baudrate, bytesize, parity, stopbits)	report = []	session = settings.login(ip)	if session == None:		sys.exit(2)	if check_all:			for baudrate in (1200, 2400, 4800, 9600, 19200, 38400, 57600, 115200):			for bytesize in (7, 8):				for parity in ('N', 'O', 'E'):					#Skip unsupported modes					if bytesize == 7 and parity == 'N':						continue					if stopbits in (1, 2):						#Run tests						test_suite(uart, tcp, ip, session, baudrate, bytesize, parity, stopbits)						report.append(str(baudrate) + " " + str(bytesize) + parity + str(stopbits))	else:		test_suite(uart, tcp, ip, session, baudrate, bytesize, parity, stopbits)		report.append(str(baudrate) + " " + str(bytesize) + parity + str(stopbits))	print "Checked:"	for i in report:		print i	print "\rAll done"if __name__ == '__main__':	main()
 |