Loading load_check.py +67 −43 Original line number Diff line number Diff line Loading @@ -75,22 +75,23 @@ class MultipleTest(unittest.TestCase): print("잘못된 입력입니다. 기본값 '일반 접속(1)'으로 설정합니다.") self.function_to_execute = 1 print("\n6. [선택] 연결 유지 시간(분)(기본값: 120)") try: self.keep_connection_time = int(input().strip()) * 60 except ValueError: print("잘못된 입력입니다. 기본값 '120분'으로 설정합니다.") self.keep_connection_time = 120 * 60 print("\n7. [선택] 유지 커넥션 개수 / 최대 커넥션 수 까지 도달 시간(분)(기본값: 50 / 30)") print("\n6. [선택] 커넥션 수 / 테스트 시간(분)(기본값: 30개 / 120분)") try: pool_input = input().strip().split('/') self.max_connections = int(pool_input[0].strip()) self.ramp_up_time = int(pool_input[1].strip()) * 60 self.max_connections = int(pool_input[0].strip()) if pool_input[0].strip() else 30 self.keep_connection_time = int(pool_input[1].strip()) * 60 if len(pool_input) > 1 else 120 * 60 # ✅ Validation: ensure test time is at least 2x connections min_required = self.max_connections * 2 if (self.keep_connection_time // 60) < min_required: print(f"[WARN] 테스트 시간({self.keep_connection_time//60}분)이 커넥션 수({self.max_connections})의 2배({min_required}분)보다 짧습니다.") print(f"[INFO] 테스트 시간을 {min_required}분으로 자동 조정합니다.") self.keep_connection_time = min_required * 60 except (ValueError, IndexError): print("잘못된 입력입니다. 기본값 '50개 / 30분'으로 설정합니다.") self.max_connections = 50 self.ramp_up_time = 30 * 60 print("잘못된 입력입니다. 기본값 '30개 / 120분'으로 설정합니다.") self.max_connections = 30 self.keep_connection_time = 120 * 60 # Make Screenshots_YYMMDD_HHmmss folder timestamp = datetime.now().strftime("%y%m%d_%H%M%S") Loading Loading @@ -455,7 +456,7 @@ class MultipleTest(unittest.TestCase): elif account_type == 3: print(f"Session {driver.instance_number}, User: {driver.user_id}: User account - no specific admin actions.") def _run_single_driver_actions(self, address, user_id, pw, account_type, driver_instance_number): def _run_single_driver_actions(self, address, user_id, pw, account_type, driver_instance_number, end_time_global): """Sets up and runs a single driver instance's actions.""" # Create a new driver instance for this thread options = webdriver.ChromeOptions() Loading @@ -480,16 +481,14 @@ class MultipleTest(unittest.TestCase): driver.implicitly_wait(10) # Common implicit wait for this driver self.drivers.append(driver) # Add this driver to the list for tearDown # Start the timer for the connection start_time = time.time() # Log in once at the start of the session wait = WebDriverWait(driver, 10) self._execute_single_action_and_measure(driver, self._login_action, "_login_action", address, user_id, pw, wait) print(f"Starting looped actions for session {driver_instance_number} with user: {user_id}") # Loop actions until the keep connection time is up while time.time() - start_time < self.keep_connection_time: # ✅ Loop until the global end time (all threads stop together) while time.time() < end_time_global: if self.function_to_execute == 1: self._execute_and_measure_actions(driver, account_type, self._run_driver1_actions) elif self.function_to_execute == 2: Loading @@ -497,7 +496,8 @@ class MultipleTest(unittest.TestCase): elif self.function_to_execute == 3: self._execute_and_measure_actions(driver, account_type, self._run_driver3_actions) print(f"Session {driver_instance_number} with user {user_id} finished its action loop.") print(f"Session {driver_instance_number} with user {user_id} finished at global stop time.") def _execute_and_measure_actions(self, driver, account_type, main_driver_func): """Executes the main driver function and measures its duration.""" Loading @@ -509,40 +509,64 @@ class MultipleTest(unittest.TestCase): def test_multiple(self): """Main test method to start multiple threads for load testing.""" address = self.dc_address threads_to_start = [] # Calculate the total number of connections to be made total_connections_to_start = sum(config["duplicate_count"] for config in self.account_configs) # Determine the number of connections to start per second to meet the ramp-up time if self.ramp_up_time > 0: threads_per_second = total_connections_to_start / self.ramp_up_time sleep_time_per_thread = 1.0 / threads_per_second if threads_per_second > 0 else 0 else: sleep_time_per_thread = 0 driver_instance_number = 1 # Create a list of all thread arguments to be started for config in self.account_configs: # Use max_connections from input total_connections_to_start = self.max_connections # Just take the first account config for all threads config = self.account_configs[0] account_type = config["account_type"] username = config["username"] password = config["password"] duplicate_count = config["duplicate_count"] for _ in range(duplicate_count): # Build the thread args list for driver_instance_number in range(1, total_connections_to_start + 1): threads_to_start.append((address, username, password, account_type, driver_instance_number)) driver_instance_number += 1 # Shuffle the list to randomize the order of connections # Shuffle the list (optional randomization) random.shuffle(threads_to_start) active_threads = [] # Ramp up the connections linearly over time for args in threads_to_start: thread = threading.Thread(target=self._run_single_driver_actions, args=args) start_time = datetime.now() end_time_global = time.time() + self.keep_connection_time end_time_dt = datetime.fromtimestamp(end_time_global) print(f"[INFO] Test started at {start_time.strftime('%H:%M:%S')} with {total_connections_to_start} planned threads") print(f"[INFO] Global stop time set to {end_time_dt.strftime('%H:%M:%S')}") # 🚀 Start first thread immediately, then one new thread every minute for i, args in enumerate(threads_to_start, start=1): # Pass the global end time into each thread thread = threading.Thread( target=self._run_single_driver_actions, args=(*args, end_time_global) ) active_threads.append(thread) thread.start() if sleep_time_per_thread > 0: time.sleep(sleep_time_per_thread) # Print status print( f"[INFO] Started thread #{i} at {datetime.now().strftime('%H:%M:%S')} " f"(Active threads: {len(active_threads)})" ) time.sleep(60) # wait 1 minute before starting the next thread # Wait for all threads to complete their timed action loops for thread in active_threads: thread.join() end_time = datetime.now() duration = (end_time - start_time).total_seconds() / 60.0 # minutes print( f"[SUMMARY] All {total_connections_to_start} threads stopped together at global stop time " f"{end_time_dt.strftime('%H:%M:%S')} (Total run: {duration:.1f} minutes)" ) def is_element_present(self, how, what): """Checks if an element is present on the page.""" try: Loading Loading
load_check.py +67 −43 Original line number Diff line number Diff line Loading @@ -75,22 +75,23 @@ class MultipleTest(unittest.TestCase): print("잘못된 입력입니다. 기본값 '일반 접속(1)'으로 설정합니다.") self.function_to_execute = 1 print("\n6. [선택] 연결 유지 시간(분)(기본값: 120)") try: self.keep_connection_time = int(input().strip()) * 60 except ValueError: print("잘못된 입력입니다. 기본값 '120분'으로 설정합니다.") self.keep_connection_time = 120 * 60 print("\n7. [선택] 유지 커넥션 개수 / 최대 커넥션 수 까지 도달 시간(분)(기본값: 50 / 30)") print("\n6. [선택] 커넥션 수 / 테스트 시간(분)(기본값: 30개 / 120분)") try: pool_input = input().strip().split('/') self.max_connections = int(pool_input[0].strip()) self.ramp_up_time = int(pool_input[1].strip()) * 60 self.max_connections = int(pool_input[0].strip()) if pool_input[0].strip() else 30 self.keep_connection_time = int(pool_input[1].strip()) * 60 if len(pool_input) > 1 else 120 * 60 # ✅ Validation: ensure test time is at least 2x connections min_required = self.max_connections * 2 if (self.keep_connection_time // 60) < min_required: print(f"[WARN] 테스트 시간({self.keep_connection_time//60}분)이 커넥션 수({self.max_connections})의 2배({min_required}분)보다 짧습니다.") print(f"[INFO] 테스트 시간을 {min_required}분으로 자동 조정합니다.") self.keep_connection_time = min_required * 60 except (ValueError, IndexError): print("잘못된 입력입니다. 기본값 '50개 / 30분'으로 설정합니다.") self.max_connections = 50 self.ramp_up_time = 30 * 60 print("잘못된 입력입니다. 기본값 '30개 / 120분'으로 설정합니다.") self.max_connections = 30 self.keep_connection_time = 120 * 60 # Make Screenshots_YYMMDD_HHmmss folder timestamp = datetime.now().strftime("%y%m%d_%H%M%S") Loading Loading @@ -455,7 +456,7 @@ class MultipleTest(unittest.TestCase): elif account_type == 3: print(f"Session {driver.instance_number}, User: {driver.user_id}: User account - no specific admin actions.") def _run_single_driver_actions(self, address, user_id, pw, account_type, driver_instance_number): def _run_single_driver_actions(self, address, user_id, pw, account_type, driver_instance_number, end_time_global): """Sets up and runs a single driver instance's actions.""" # Create a new driver instance for this thread options = webdriver.ChromeOptions() Loading @@ -480,16 +481,14 @@ class MultipleTest(unittest.TestCase): driver.implicitly_wait(10) # Common implicit wait for this driver self.drivers.append(driver) # Add this driver to the list for tearDown # Start the timer for the connection start_time = time.time() # Log in once at the start of the session wait = WebDriverWait(driver, 10) self._execute_single_action_and_measure(driver, self._login_action, "_login_action", address, user_id, pw, wait) print(f"Starting looped actions for session {driver_instance_number} with user: {user_id}") # Loop actions until the keep connection time is up while time.time() - start_time < self.keep_connection_time: # ✅ Loop until the global end time (all threads stop together) while time.time() < end_time_global: if self.function_to_execute == 1: self._execute_and_measure_actions(driver, account_type, self._run_driver1_actions) elif self.function_to_execute == 2: Loading @@ -497,7 +496,8 @@ class MultipleTest(unittest.TestCase): elif self.function_to_execute == 3: self._execute_and_measure_actions(driver, account_type, self._run_driver3_actions) print(f"Session {driver_instance_number} with user {user_id} finished its action loop.") print(f"Session {driver_instance_number} with user {user_id} finished at global stop time.") def _execute_and_measure_actions(self, driver, account_type, main_driver_func): """Executes the main driver function and measures its duration.""" Loading @@ -509,40 +509,64 @@ class MultipleTest(unittest.TestCase): def test_multiple(self): """Main test method to start multiple threads for load testing.""" address = self.dc_address threads_to_start = [] # Calculate the total number of connections to be made total_connections_to_start = sum(config["duplicate_count"] for config in self.account_configs) # Determine the number of connections to start per second to meet the ramp-up time if self.ramp_up_time > 0: threads_per_second = total_connections_to_start / self.ramp_up_time sleep_time_per_thread = 1.0 / threads_per_second if threads_per_second > 0 else 0 else: sleep_time_per_thread = 0 driver_instance_number = 1 # Create a list of all thread arguments to be started for config in self.account_configs: # Use max_connections from input total_connections_to_start = self.max_connections # Just take the first account config for all threads config = self.account_configs[0] account_type = config["account_type"] username = config["username"] password = config["password"] duplicate_count = config["duplicate_count"] for _ in range(duplicate_count): # Build the thread args list for driver_instance_number in range(1, total_connections_to_start + 1): threads_to_start.append((address, username, password, account_type, driver_instance_number)) driver_instance_number += 1 # Shuffle the list to randomize the order of connections # Shuffle the list (optional randomization) random.shuffle(threads_to_start) active_threads = [] # Ramp up the connections linearly over time for args in threads_to_start: thread = threading.Thread(target=self._run_single_driver_actions, args=args) start_time = datetime.now() end_time_global = time.time() + self.keep_connection_time end_time_dt = datetime.fromtimestamp(end_time_global) print(f"[INFO] Test started at {start_time.strftime('%H:%M:%S')} with {total_connections_to_start} planned threads") print(f"[INFO] Global stop time set to {end_time_dt.strftime('%H:%M:%S')}") # 🚀 Start first thread immediately, then one new thread every minute for i, args in enumerate(threads_to_start, start=1): # Pass the global end time into each thread thread = threading.Thread( target=self._run_single_driver_actions, args=(*args, end_time_global) ) active_threads.append(thread) thread.start() if sleep_time_per_thread > 0: time.sleep(sleep_time_per_thread) # Print status print( f"[INFO] Started thread #{i} at {datetime.now().strftime('%H:%M:%S')} " f"(Active threads: {len(active_threads)})" ) time.sleep(60) # wait 1 minute before starting the next thread # Wait for all threads to complete their timed action loops for thread in active_threads: thread.join() end_time = datetime.now() duration = (end_time - start_time).total_seconds() / 60.0 # minutes print( f"[SUMMARY] All {total_connections_to_start} threads stopped together at global stop time " f"{end_time_dt.strftime('%H:%M:%S')} (Total run: {duration:.1f} minutes)" ) def is_element_present(self, how, what): """Checks if an element is present on the page.""" try: Loading