portgw_test.py 7.0 KB


  1. #!/usr/bin/python
  2. import sys, getopt, time
  3. import debug, json, settings
  4. from datetime import datetime
  5. from transport import PortgwSerial
  6. from transport import PortgwTcp
  7. def usage():
  8. print "Usage:\r\n\
  9. --ip=<address> \t\tIP adress to connect to\r\n\
  10. --port=<number> \tPort number to connect to\r\n\
  11. --serial=<device> \tSerial port to use\r\n\
  12. --baud=<baudrate> \tbaudrate\r\n\
  13. --bytesize=<number> \tbyte size in bits\r\n\
  14. --parity=['N'|'O'|'E'] \tparity: none, odd, even\r\n\
  15. --stopbits=<number> \tnumber of stop bits\r\n\
  16. --all \t\t\tcheck all UART modes\r\n\
  17. --help \t\t\tprint usage info\r\n\
  18. --debug \t\tenable debug output"
  19. def print_bitrate(datalen, duration):
  20. print "%.1f sec, %d bps" %(duration, datalen * 8 / duration)
  21. def test_src2dst(src, dst, data):
  22. print "%s -> %s (%d bytes)" %(src.name, dst.name, len(data))
  23. if not src.open():
  24. print "%s open failed" %(src.name)
  25. sys.exit()
  26. if not dst.open():
  27. print "%s open failed" %(dst.name)
  28. sys.exit()
  29. txdata = data
  30. debug.print_debug("%s TX (%d bytes): %s" %(src.name, len(txdata), txdata[:10] + "..."))
  31. timestamp = time.time()
  32. res = src.write(txdata)
  33. if res <= 0:
  34. print "%s write fail" %(src.name)
  35. sys.exit()
  36. rxdata = dst.read(len(txdata), 1)
  37. if res == None:
  38. print "%s read fail" %(dst.name)
  39. sys.exit()
  40. debug.print_debug("%s RX (%d bytes): %s" %(dst.name, len(rxdata), rxdata[:10] + "..."))
  41. if rxdata != txdata:
  42. debug.print_data("TX:\n", txdata)
  43. debug.print_data("RX:\n", rxdata)
  44. print "Test failed:"
  45. sys.exit()
  46. print_bitrate(len(data), time.time() - timestamp)
  47. print "OK\n"
  48. dst.close()
  49. src.close()
  50. def test_echo(src, dst, data):
  51. print "%s -> %s -> %s -> %s (%d bytes)" %(src.name, dst.name, dst.name, src.name, len(data))
  52. if not src.open():
  53. print "TCP open failed"
  54. sys.exit()
  55. if not dst.open():
  56. print "UART open failed"
  57. sys.exit()
  58. txdata = data
  59. debug.print_debug("%s TX (%d bytes): %s" %(src.name, len(txdata), txdata[:10] + "..."))
  60. timestamp = time.time()
  61. res = src.write(txdata)
  62. if res <= 0:
  63. print "Write fail"
  64. sys.exit()
  65. echo = dst.read(len(txdata))
  66. if res == None:
  67. print "Read fail"
  68. sys.exit()
  69. debug.print_debug("%s ECHO (%d bytes): %s" %(dst.name, len(echo), echo[:10] + "..."))
  70. res = dst.write(echo)
  71. if res <= 0:
  72. print "Write fail"
  73. sys.exit()
  74. rxdata = src.read(len(echo))
  75. if res == None:
  76. print "Read fail"
  77. sys.exit()
  78. debug.print_debug("%s RX (%d bytes): %s" %(src.name, len(rxdata), rxdata[:10] + "..."))
  79. if rxdata != txdata:
  80. print "Fail"
  81. sys.exit()
  82. #Double data length (echo)
  83. print_bitrate(len(data) * 2, time.time() - timestamp)
  84. print "OK\n"
  85. src.close()
  86. dst.close()
  87. def test_mult_requests(src, dst, data, duration):
  88. print "%s -> %s -> %s -> %s (%d bytes, multi: %d sec)" %(src.name, dst.name, dst.name, src.name,
  89. len (data), duration)
  90. if not src.open():
  91. print "TCP open failed"
  92. sys.exit()
  93. if not dst.open():
  94. print "UART open failed"
  95. sys.exit()
  96. txdata = data
  97. req_count = 0
  98. timestamp = time.time()
  99. while time.time() - timestamp < duration:
  100. print "\rDuration: %.1f/%.1f sec, request: %d" % (time.time() - timestamp, duration, req_count + 1),
  101. sys.stdout.flush()
  102. debug.print_debug("%s TX (%d bytes): %s" %(src.name, len(txdata), txdata[:10] + "..."))
  103. res = src.write(txdata)
  104. if res <= 0:
  105. print "Write fail"
  106. sys.exit()
  107. echo = dst.read(len(txdata))
  108. if res == None:
  109. print "Read fail"
  110. sys.exit()
  111. debug.print_debug("%s ECHO (%d bytes): %s" %(dst.name, len(echo), echo[:10] + "..."))
  112. res = dst.write(echo)
  113. if res <= 0:
  114. print "Write fail"
  115. sys.exit()
  116. rxdata = src.read(len(echo))
  117. if res == None:
  118. print "Read fail"
  119. sys.exit()
  120. debug.print_debug("%s RX (%d bytes): %s\n" %(src.name, len(rxdata), rxdata[:10] + "..."))
  121. if rxdata != txdata:
  122. print "Fail"
  123. sys.exit()
  124. req_count += 1
  125. print "\nTest duration: %d sec, %d requests completed" %(duration, req_count)
  126. print "OK\n"
  127. src.close()
  128. dst.close()
  129. def test_suite(uart, tcp, ip, session, baudrate, bytesize, parity, stopbits):
  130. #Test data
  131. test_packet = "AABBCCDD"*10
  132. #Configure hosts's UART
  133. uart.close()
  134. uart.baudrate = baudrate
  135. uart.bytesize = bytesize
  136. uart.parity = parity
  137. uart.stopbits = stopbits
  138. uart.open()
  139. # Configure device's UART
  140. if not settings.change(ip, session, baudrate, bytesize, parity, stopbits):
  141. print "UART settings set fail"
  142. sys.exit(2)
  143. else:
  144. print "Settings: %d %d%s%d" %(baudrate, bytesize, parity, stopbits)
  145. # UART -> TCP
  146. print "-"*10
  147. test_src2dst(uart, tcp, test_packet)
  148. # TCP -> UART
  149. print "-"*10
  150. test_src2dst(tcp, uart, test_packet)
  151. # TCP echo: TCP -> UART -> UART -> TCP
  152. print "-"*10
  153. test_echo(tcp, uart, test_packet)
  154. # UART echo: UART -> TCP -> TCP -> UART
  155. print "-"*10
  156. test_echo(uart, tcp, test_packet)
  157. # TCP -> UART -> UART -> TCP (multiple requests)
  158. print "-"*10
  159. test_mult_requests(tcp, uart, "AABBCCDD"*16, 5)
  160. # UART -> TCP -> TCP -> UART (multiple requests)
  161. print "-"*10
  162. test_mult_requests(uart, tcp, "AABBCCDD"*16, 5)
  163. def main():
  164. check_all = False
  165. ip = None
  166. port = 1001
  167. serial = '/dev/ttyUSB0'
  168. baudrate = 115200
  169. bytesize = 8
  170. parity = 'N'
  171. stopbits = 1
  172. try:
  173. opts, args = getopt.getopt(sys.argv[1:], "hs:i:p:b:z:y:t:da", \
  174. ["help", "serial=", "ip=", "port=", "baud=", "bytesize=",
  175. "parity=", "stopbits=", "debug", "all"])
  176. except getopt.GetoptError:
  177. usage()
  178. sys.exit(2)
  179. for opt, arg in opts:
  180. if opt in ("-h", "--help"):
  181. usage()
  182. sys.exit()
  183. if opt in ("-s", "--serial"):
  184. serial = arg
  185. if opt in ("-b", "--baud"):
  186. if (arg.isdigit()):
  187. baudrate = int(arg)
  188. else:
  189. usage()
  190. sys.exit()
  191. if opt in ("-z", "--bytesize"):
  192. if (arg.isdigit() and arg in ('7', '8')):
  193. bytesize = int(arg)
  194. else:
  195. print arg
  196. usage()
  197. sys.exit()
  198. if opt in ("-y", "--parity"):
  199. if (arg in ('N', 'O', 'E')):
  200. parity = arg
  201. else:
  202. usage()
  203. sys.exit()
  204. if opt in ("-t", "--stopbits"):
  205. if (arg in ('1', '2')):
  206. stopbits = int(arg)
  207. else:
  208. usage()
  209. sys.exit()
  210. if opt in ("-i", "--ip"):
  211. ip = arg
  212. if opt in ("-p", "--port"):
  213. if (arg.isdigit()):
  214. port = int(arg)
  215. if opt in ("-d", "--debug"):
  216. debug.DEBUG = True
  217. if opt in ("-a", "--all"):
  218. check_all = True
  219. if ip == None:
  220. usage()
  221. sys.exit(2)
  222. tcp = PortgwTcp(ip, port)
  223. uart = PortgwSerial(serial, baudrate, bytesize, parity, stopbits)
  224. report = []
  225. session = settings.login(ip)
  226. if session == None:
  227. sys.exit(2)
  228. if check_all:
  229. for baudrate in (1200, 2400, 4800, 9600, 19200, 38400, 57600, 115200):
  230. for bytesize in (7, 8):
  231. for parity in ('N', 'O', 'E'):
  232. #Skip unsupported modes
  233. if bytesize == 7 and parity == 'N':
  234. continue
  235. if stopbits in (1, 2):
  236. #Run tests
  237. test_suite(uart, tcp, ip, session, baudrate, bytesize, parity, stopbits)
  238. report.append(str(baudrate) + " " + str(bytesize) + parity + str(stopbits))
  239. else:
  240. test_suite(uart, tcp, ip, session, baudrate, bytesize, parity, stopbits)
  241. report.append(str(baudrate) + " " + str(bytesize) + parity + str(stopbits))
  242. print "Checked:"
  243. for i in report:
  244. print i
  245. print "\rAll done"
  246. if __name__ == '__main__':
  247. main()