TelenkovDmitry 1 рік тому
батько
коміт
f58439a41c
2 змінених файлів з 81 додано та 8 видалено
  1. 72 0
      courses/python_threading/proc1.py
  2. 9 8
      selenium/metrolog.py

+ 72 - 0
courses/python_threading/proc1.py

@@ -0,0 +1,72 @@
+import time
+import multiprocessing
+import psutil
+
+def test():
+    for _ in range(3):
+        print(f"{multiprocessing.current_process().name} - {time.time()}")
+        time.sleep(2)
+
+
+
+def get_os_process():
+    for proc in psutil.process_iter():
+        if proc.name() == 'python.exe':
+            print(proc.name())
+
+def test_prc_1():
+    """Базовые методы процессов"""
+    prc = multiprocessing.Process(target=test, name="prc-1")
+    prc.start()
+    print("Процесс запущен")
+    # # get_os_process()
+
+    prc.join()  # ждет завершения процесса
+
+    while True:
+        print(prc.is_alive())
+        print(prc.pid)
+        time.sleep(5)
+        prc.terminate() # убить процесс
+
+
+def test_prc_2():
+    """Метод join - ожидание завершения процесса"""
+    prc = []
+    for _ in range(3):
+        pr = multiprocessing.Process(target=test)
+        prc.append(pr)
+        pr.start()
+
+    for i in prc:
+        i.join()
+
+    print("Все процессы завершены")
+
+
+class Process(multiprocessing.Process):
+    def run(self):
+        print("work")
+    
+def class_test():
+    pr = Process()
+    pr.start()
+
+
+if __name__ == '__main__':
+
+    
+    # test_prc_2()
+    # prc = multiprocessing.Process(target=test, name="prc-1")
+    # prc.start()
+    # print("Процесс запущен")
+    # # get_os_process()
+
+    # prc.join()  # ждет завершения процесса
+
+    # while True:
+    #     print(prc.is_alive())
+    #     print(prc.pid)
+    #     time.sleep(5)
+    #     prc.terminate() # убить процесс
+

+ 9 - 8
selenium/metrolog.py

@@ -24,13 +24,12 @@ class MetrologTester():
         self.pgw_2 = PortgwTcp(ip, self.PGW_PORT_2)
         self.pgw_3 = PortgwTcp(ip, self.PGW_PORT_3)
 
-        self.pgw_1_thread = threading.Thread(target=self.portgw_sender, args=(self.pgw_1, ))
-        self.pgw_2_thread = threading.Thread(target=self.portgw_sender, args=(self.pgw_2, ))
-        self.pgw_3_thread = threading.Thread(target=self.portgw_sender, args=(self.pgw_3, ))
+        self.pgw_1_thread = threading.Thread(target=self.portgw_sender, args=(self.pgw_1, ), daemon=True)
+        self.pgw_2_thread = threading.Thread(target=self.portgw_sender, args=(self.pgw_2, ), daemon=True)
+        self.pgw_3_thread = threading.Thread(target=self.portgw_sender, args=(self.pgw_3, ), daemon=True)
 
         self.ip = ip
-        self.driver = webdriver.Firefox()
-        self.web_thread = threading.Thread(target=self.test_page_click)
+        self.web_thread = threading.Thread(target=self.test_page_click, daemon=True)
 
     def connect(self):
         self.driver.get('http://' + self.ip)
@@ -43,6 +42,7 @@ class MetrologTester():
         return True if self.driver.title == "Состояние модема" else False
 
     def test_page_click(self):
+        self.driver = webdriver.Firefox()
         self.connect()
         while True:
             if self.login() == False:
@@ -87,14 +87,15 @@ class MetrologTester():
     
 
 def metrolog():
-    robot = MetrologTester('192.168.31.230') 
+    robot = MetrologTester('192.168.30.154') 
     robot.start_pgw_sender([0, 1, 1])
-    robot.start_web_clicker()
+    # robot.start_web_clicker()
     
     
 
 if __name__ == '__main__':
     metrolog()
-    
+    while True:
+        time.sleep(1)