Commit 2990ccab authored by JunHyung An's avatar JunHyung An
Browse files

Add feature on user input - keep connection time, keep connection count(pool)

parent 1c149e03
Loading
Loading
Loading
Loading
Loading
+75 −51
Original line number Diff line number Diff line
@@ -5,6 +5,7 @@ from selenium.common.exceptions import NoSuchElementException, NoAlertPresentExc
import unittest, time
import threading  # Import the threading module
import sys # Import the sys module for OS detection
import random # Import the random module for shuffling

# Import for Explicit Waits
from selenium.webdriver.support.ui import WebDriverWait
@@ -12,7 +13,6 @@ from selenium.webdriver.support import expected_conditions as EC

class MultipleTest(unittest.TestCase):
    def setUp(self):

        # Test case user input
        print("[필수 입력] Chrome 위치 (e.g., /opt/chrome-linux64/chrome, D:\\tmp\\chrome-win64\\chrome.exe): ")
        self.binary_location = input().strip()
@@ -41,7 +41,6 @@ class MultipleTest(unittest.TestCase):
                    print("입력된 계정 정보가 없습니다. 프로그램을 종료합니다.")
                    exit(1)
                break

            try:
                # Each line is parsed and appended to the list as a dictionary.
                parts = user_input.split(' / ')
@@ -58,10 +57,10 @@ class MultipleTest(unittest.TestCase):
                    "duplicate_count": duplicate_count
                }
                self.account_configs.append(config)

            except (ValueError, IndexError) as e:
                print(f"잘못된 입력 형식입니다: '{user_input}'. 해당 줄은 무시됩니다.")
        
        # New inputs for connection management
        print("\n[선택(디폴트:일반접속)] 테스트 유형 : 일반 접속(1), 일시 접속(2), 부하 화면 접근(3)")
        try:
            self.function_to_execute = int(input().strip())
@@ -69,14 +68,29 @@ class MultipleTest(unittest.TestCase):
            print("잘못된 입력입니다. 기본값 (1)으로 설정합니다.")
            self.function_to_execute = 1

        print("\n[선택(디폴트:60)] 커넥션 유지 시간 (분): ")
        try:
            self.keep_connection_time = int(input().strip()) * 60
        except ValueError:
            print("잘못된 입력입니다. 기본값 60분으로 설정합니다.")
            self.keep_connection_time = 60 * 60
        
        print("\n[선택(디폴트:50/30)] 최대 커넥션 수 / 최대 커넥션 도달 시간 (분): ")
        try:
            pool_input = input().strip().split('/')
            self.max_connections = int(pool_input[0].strip())
            self.ramp_up_time = int(pool_input[1].strip()) * 60
        except (ValueError, IndexError):
            print("잘못된 입력입니다. 기본값 50 / 30분으로 설정합니다.")
            self.max_connections = 50
            self.ramp_up_time = 30 * 60

        # Initialize a list to hold driver instances for each thread
        self.drivers = []
        self.verificationErrors = []
        self.accept_next_alert = True


    # ... (rest of your _interact_with_time_filters, _login_action, etc. methods remain unchanged)

    def _interact_with_time_filters(self, driver):
        """
        Automates the interaction with time filter buttons on a webpage using Selenium WebDriver.
@@ -330,34 +344,26 @@ class MultipleTest(unittest.TestCase):
    def _run_driver1_actions(self, driver, address, id, pw, account_type):
        wait = WebDriverWait(driver, 10)
        instance_number = driver.instance_number

        # Pass the user 'id' as the second argument in every call
        self._execute_single_action_and_measure(driver, id, self._login_action, instance_number, "_login_action", address, id, pw, wait)
        # These actions will be repeated in a loop
        self._execute_single_action_and_measure(driver, id, self._dashboard_actions, instance_number, "_dashboard_actions")
        self._execute_single_action_and_measure(driver, id, self._topology_map_actions, instance_number, "_topology_map_actions")
        self._execute_single_action_and_measure(driver, id, self._performance_operation_actions, instance_number, "_performance_operation_actions")
        self._execute_single_action_and_measure(driver, id, self._log_analysis_actions, instance_number, "_log_analysis_actions")
        self._execute_single_action_and_measure(driver, id, self._event_status_actions, instance_number, "_event_status_actions")
        self._execute_single_action_and_measure(driver, id, self._work_management_actions, instance_number, "_work_management_actions", wait)

        if account_type == 1:
            self._execute_single_action_and_measure(driver, id, self._admin_settings_actions, instance_number, "_admin_settings_actions")
        elif account_type == 2:
            print(f"Session {instance_number}, User: {id}: Manager account - no specific admin actions.")
        elif account_type == 3:
            print(f"Session {instance_number}, User: {id}: User account - no specific admin actions.")


    def _run_driver2_actions(self, driver, address, id, pw, account_type):
        wait = WebDriverWait(driver, 10)
        instance_number = driver.instance_number

        # Pass the user 'id' as the second argument in every call
        self._execute_single_action_and_measure(driver, id, self._login_action, instance_number, "_login_action", address, id, pw, wait)
        # These actions will be repeated in a loop
        self._execute_single_action_and_measure(driver, id, self._dashboard_actions, instance_number, "_dashboard_actions")
        self._execute_single_action_and_measure(driver, id, self._topology_map_actions, instance_number, "_topology_map_actions")
        self._execute_single_action_and_measure(driver, id, self._event_status_actions, instance_number, "_event_status_actions")

        if account_type == 1:
            print(f"Session {instance_number}, User: {id}: Admin actions not defined for this driver function.")
        elif account_type == 2:
@@ -365,20 +371,16 @@ class MultipleTest(unittest.TestCase):
        elif account_type == 3:
            print(f"Session {instance_number}, User: {id}: User account - no specific admin actions.")
 

    def _run_driver3_actions(self, driver, address, id, pw, account_type):
        wait = WebDriverWait(driver, 10)
        instance_number = driver.instance_number

        # Pass the user 'id' as the second argument in every call
        self._execute_single_action_and_measure(driver, id, self._login_action, instance_number, "_login_action", address, id, pw, wait)
        # These actions will be repeated in a loop
        self._execute_single_action_and_measure(driver, id, self._dashboard_actions, instance_number, "_dashboard_actions (1st)")
        self._execute_single_action_and_measure(driver, id, self._dashboard_actions, instance_number, "_dashboard_actions (2nd)")
        self._execute_single_action_and_measure(driver, id, self._performance_operation_actions, instance_number, "_performance_operation_actions (1st)")
        self._execute_single_action_and_measure(driver, id, self._performance_operation_actions, instance_number, "_performance_operation_actions (2nd)")
        self._execute_single_action_and_measure(driver, id, self._log_analysis_actions, instance_number, "_log_analysis_actions (1st)")
        self._execute_single_action_and_measure(driver, id, self._log_analysis_actions, instance_number, "_log_analysis_actions (2nd)")

        if account_type == 1:
            self._execute_single_action_and_measure(driver, id, self._admin_alarm_settings_actions, instance_number, "_admin_alarm_settings_actions")
            #self._execute_single_action_and_measure(driver, id, self._admin_alarm_settings_actions, instance_number, "_admin_alarm_settings_actions")
@@ -388,13 +390,11 @@ class MultipleTest(unittest.TestCase):
            print(f"Session {instance_number}, User: {id}: User account - no specific admin actions.")
            
    def _run_single_driver_actions(self, address, id, pw, account_type, driver_instance_number):

        # Create a new driver instance for this thread
        options = webdriver.ChromeOptions()
        options.add_experimental_option("excludeSwitches", ["enable-logging"])
        options.add_argument("--ignore-certificate-errors")
        options.add_argument('--start-maximized')

        # Secret Mode
        # options.add_argument("--incognito")
        # Guest Mode
@@ -403,19 +403,26 @@ class MultipleTest(unittest.TestCase):
        options.add_argument("--headless=new")
        # options.add_argument("--disable-gpu")
        options.binary_location = self.binary_location

        # Linux Only
        if sys.platform.startswith('linux'): # Check if the operating system is Linux
            options.add_argument("--no-sandbox")
            options.add_argument("--disable-dev-shm-usage")

        driver = webdriver.Chrome(executable_path=self.webdriver_path, options=options)
        driver.instance_number = driver_instance_number # Attach instance number to driver for logging
        driver.implicitly_wait(10)  # Common implicit wait for this driver
        self.drivers.append(driver)  # Add this driver to the list for tearDown
        
        print(f"Starting actions for session {driver_instance_number} with user: {id}")
        # Start the timer for the connection
        start_time = time.time()
        
        # Log in once at the start of the session
        wait = WebDriverWait(driver, 10)
        self._execute_single_action_and_measure(driver, id, self._login_action, driver_instance_number, "_login_action", address, id, pw, wait)
        
        print(f"Starting looped actions for session {driver_instance_number} with user: {id}")
        
        # Loop actions until the keep connection time is up
        while time.time() - start_time < self.keep_connection_time:
            if self.function_to_execute == 1:
                self._execute_and_measure_actions(driver, address, id, pw, account_type, driver_instance_number, self._run_driver1_actions)
            elif self.function_to_execute == 2:
@@ -423,9 +430,10 @@ class MultipleTest(unittest.TestCase):
            elif self.function_to_execute == 3:
                self._execute_and_measure_actions(driver, address, id, pw, account_type, driver_instance_number, self._run_driver3_actions)
        
        print(f"Session {driver_instance_number} with user {id} finished its action loop.")

    def _execute_and_measure_actions(self, driver, address, id, pw, account_type, driver_instance_number, main_driver_func):
        start_time = time.time()

        main_driver_func(driver, address, id, pw, account_type)
        end_time = time.time()
        print(f"Total time for Session {driver_instance_number}, User: {id} ({main_driver_func.__name__}): {end_time - start_time:.2f} seconds")
@@ -433,29 +441,46 @@ class MultipleTest(unittest.TestCase):

    def test_multiple(self):
        address = self.DC_address
        threads = []
        # A unique number for each browser session instance for logging.
        threads_to_start = []
        
        # Calculate the total number of connections to be made
        total_connections_to_start = sum(config["duplicate_count"] for config in self.account_configs)
        
        # Determine the number of connections to start per second to meet the ramp-up time
        if self.ramp_up_time > 0:
            threads_per_second = total_connections_to_start / self.ramp_up_time
            sleep_time_per_thread = 1.0 / threads_per_second if threads_per_second > 0 else 0
        else:
            sleep_time_per_thread = 0

        driver_instance_number = 1
        
        # The outer loop iterates through each configuration (e.g., admin, manager, user).
        # Create a list of all thread arguments to be started
        for config in self.account_configs:
            account_type = config["account_type"]
            username = config["username"]
            password = config["password"]
            duplicate_count = config["duplicate_count"]

            # The inner loop creates the specified number of threads for the current configuration.
            for _ in range(duplicate_count):
                thread = threading.Thread(target=self._run_single_driver_actions, args=(address, username, password, account_type, driver_instance_number))
                threads.append(thread)
                thread.start()
                # Increment the instance number for the next session.
                threads_to_start.append((address, username, password, account_type, driver_instance_number))
                driver_instance_number += 1
                time.sleep(0.5)

        for thread in threads:
            thread.join() # Wait for all threads to complete
        # Shuffle the list to randomize the order of connections
        random.shuffle(threads_to_start)

        active_threads = []

        # Ramp up the connections linearly over time
        for args in threads_to_start:
            thread = threading.Thread(target=self._run_single_driver_actions, args=args)
            active_threads.append(thread)
            thread.start()
            if sleep_time_per_thread > 0:
                time.sleep(sleep_time_per_thread)

        # Wait for all threads to complete their timed action loops
        for thread in active_threads:
            thread.join()

    def is_element_present(self, how, what):
        try:
@@ -492,6 +517,5 @@ class MultipleTest(unittest.TestCase):
                print(f"Error quitting driver: {e}")
        self.assertEqual([], self.verificationErrors)


if __name__ == "__main__":
    unittest.main()