Commit 544733d4 authored by JunHyung An's avatar JunHyung An
Browse files

Add multiple line input on account type

parent 1af31785
Loading
Loading
Loading
Loading
+85 −52
Original line number Diff line number Diff line
@@ -23,23 +23,49 @@ class MultipleTest(unittest.TestCase):
        print("[필수 입력] 테스트 대상 웹 (e.g., https://example.com:8443/): ")
        self.DC_address = input()

        # Get user input for account type, credentials, and duplication count
        # A list is created to store multiple account configurations.
        self.account_configs = []
        # print("\n[필수 입력] 계정 정보를 한 줄씩 입력하세요. (포맷: 유형 / 아이디, 비번 / 세션 수)")
        print("[필수 입력] 계정유형(1-어드민,2-(지역)관리자,3-사용자) / 계정 접속정보 / 커넥션 비중 (e.g., 1 / monitor, monitor / 4)")
        user_input = input("")
        print("예: 1 / admin, admin / 3")
        print("입력을 마치려면 빈 줄에서 엔터키를 누르세요.")

        # A loop reads each line of input until an empty line is entered.
        while True:
            user_input = input("").strip()
            if not user_input:
                # Exit the loop if the input is empty.
                if not self.account_configs:
                    print("입력된 계정 정보가 없습니다. 프로그램을 종료합니다.")
                    exit(1)
                break

            try:
                # Each line is parsed and appended to the list as a dictionary.
                parts = user_input.split(' / ')
            self.account_type = int(parts[0].strip())
                account_type = int(parts[0].strip())
                credentials = parts[1].split(',')
            self.username = credentials[0].strip()
            self.password = credentials[1].strip()
            self.duplicate_count = int(parts[2].strip()) if len(parts) > 2 else 1 # Default to 1 if no duplication count is provided
                username = credentials[0].strip()
                password = credentials[1].strip()
                duplicate_count = int(parts[2].strip())

                config = {
                    "account_type": account_type,
                    "username": username,
                    "password": password,
                    "duplicate_count": duplicate_count
                }
                self.account_configs.append(config)

            except (ValueError, IndexError) as e:
            print(f"Invalid input format: {e}. Please use 'type / username, password / duplicate_count'. Exiting.")
            exit(1)
                print(f"잘못된 입력 형식입니다: '{user_input}'. 해당 줄은 무시됩니다.")

        print("[선택(디폴트:일반접속)]테스트 유형 : 일반 접속(1), 일시 접속(2), 부하 화면 접근(3)")
        print("\n[선택(디폴트:일반접속)] 테스트 유형 : 일반 접속(1), 일시 접속(2), 부하 화면 접근(3)")
        try:
            self.function_to_execute = int(input().strip())
        except ValueError:
            print("잘못된 입력입니다. 기본값 (1)으로 설정합니다.")
            self.function_to_execute = 1

        # Initialize a list to hold driver instances for each thread
        self.drivers = []
@@ -270,7 +296,7 @@ class MultipleTest(unittest.TestCase):
        driver.find_element_by_xpath(
            u"(.//*[normalize-space(text()) and normalize-space(.)='수집/판단 기준'])[1]/following::li[1]").click()

    def _execute_single_action_and_measure(self, driver, action_func, driver_instance_number, action_name, *args, **kwargs):
    def _execute_single_action_and_measure(self, driver, user_id, action_func, driver_instance_number, action_name, *args, **kwargs):
        """Executes a single action and measures its duration."""
        start_time = time.time()
        try:
@@ -279,23 +305,24 @@ class MultipleTest(unittest.TestCase):
            print(f"Session {driver_instance_number}, Action {action_name} failed: {e}")
        finally:
            end_time = time.time()
            print(f"Session {driver_instance_number}, User: {self.username}: {action_name} executed in {end_time - start_time:.2f} seconds")

            # Use the 'user_id' parameter passed to the function
            print(f"Session {driver_instance_number}, User: {user_id}: {action_name} executed in {end_time - start_time:.2f} seconds")

    def _run_driver1_actions(self, driver, address, id, pw, account_type):
        wait = WebDriverWait(driver, 10)
        instance_number = driver.instance_number # Assuming instance_number is set on driver in _run_single_driver_actions
        instance_number = driver.instance_number

        self._execute_single_action_and_measure(driver, self._login_action, instance_number, "_login_action", address, id, pw, wait)
        self._execute_single_action_and_measure(driver, self._dashboard_actions, instance_number, "_dashboard_actions")
        self._execute_single_action_and_measure(driver, self._topology_map_actions, instance_number, "_topology_map_actions")
        self._execute_single_action_and_measure(driver, self._performance_operation_actions, instance_number, "_performance_operation_actions")
        self._execute_single_action_and_measure(driver, self._log_analysis_actions, instance_number, "_log_analysis_actions")
        self._execute_single_action_and_measure(driver, self._event_status_actions, instance_number, "_event_status_actions")
        self._execute_single_action_and_measure(driver, self._work_management_actions, instance_number, "_work_management_actions", wait)
        # Pass the user 'id' as the second argument in every call
        self._execute_single_action_and_measure(driver, id, self._login_action, instance_number, "_login_action", address, id, pw, wait)
        self._execute_single_action_and_measure(driver, id, self._dashboard_actions, instance_number, "_dashboard_actions")
        self._execute_single_action_and_measure(driver, id, self._topology_map_actions, instance_number, "_topology_map_actions")
        self._execute_single_action_and_measure(driver, id, self._performance_operation_actions, instance_number, "_performance_operation_actions")
        self._execute_single_action_and_measure(driver, id, self._log_analysis_actions, instance_number, "_log_analysis_actions")
        self._execute_single_action_and_measure(driver, id, self._event_status_actions, instance_number, "_event_status_actions")
        self._execute_single_action_and_measure(driver, id, self._work_management_actions, instance_number, "_work_management_actions", wait)

        if account_type == 1:
            self._execute_single_action_and_measure(driver, self._admin_settings_actions, instance_number, "_admin_settings_actions")
            self._execute_single_action_and_measure(driver, id, self._admin_settings_actions, instance_number, "_admin_settings_actions")
        elif account_type == 2:
            print(f"Session {instance_number}, User: {id}: Manager account - no specific admin actions.")
        elif account_type == 3:
@@ -304,13 +331,13 @@ class MultipleTest(unittest.TestCase):

    def _run_driver2_actions(self, driver, address, id, pw, account_type):
        wait = WebDriverWait(driver, 10)

        instance_number = driver.instance_number

        self._execute_single_action_and_measure(driver, self._login_action, instance_number, "_login_action", address, id, pw, wait)
        self._execute_single_action_and_measure(driver, self._dashboard_actions, instance_number, "_dashboard_actions")
        self._execute_single_action_and_measure(driver, self._topology_map_actions, instance_number, "_topology_map_actions")
        self._execute_single_action_and_measure(driver, self._event_status_actions, instance_number, "_event_status_actions")
        # Pass the user 'id' as the second argument in every call
        self._execute_single_action_and_measure(driver, id, self._login_action, instance_number, "_login_action", address, id, pw, wait)
        self._execute_single_action_and_measure(driver, id, self._dashboard_actions, instance_number, "_dashboard_actions")
        self._execute_single_action_and_measure(driver, id, self._topology_map_actions, instance_number, "_topology_map_actions")
        self._execute_single_action_and_measure(driver, id, self._event_status_actions, instance_number, "_event_status_actions")

        if account_type == 1:
            print(f"Session {instance_number}, User: {id}: Admin actions not defined for this driver function.")
@@ -322,16 +349,16 @@ class MultipleTest(unittest.TestCase):

    def _run_driver3_actions(self, driver, address, id, pw, account_type):
        wait = WebDriverWait(driver, 10)

        instance_number = driver.instance_number

        self._execute_single_action_and_measure(driver, self._login_action, instance_number, "_login_action", address, id, pw, wait)
        self._execute_single_action_and_measure(driver, self._dashboard_actions, instance_number, "_dashboard_actions (1st)")
        self._execute_single_action_and_measure(driver, self._dashboard_actions, instance_number, "_dashboard_actions (2nd)")
        self._execute_single_action_and_measure(driver, self._performance_operation_actions, instance_number, "_performance_operation_actions (1st)")
        self._execute_single_action_and_measure(driver, self._performance_operation_actions, instance_number, "_performance_operation_actions (2nd)")
        self._execute_single_action_and_measure(driver, self._log_analysis_actions, instance_number, "_log_analysis_actions (1st)")
        self._execute_single_action_and_measure(driver, self._log_analysis_actions, instance_number, "_log_analysis_actions (2nd)")
        # Pass the user 'id' as the second argument in every call
        self._execute_single_action_and_measure(driver, id, self._login_action, instance_number, "_login_action", address, id, pw, wait)
        self._execute_single_action_and_measure(driver, id, self._dashboard_actions, instance_number, "_dashboard_actions (1st)")
        self._execute_single_action_and_measure(driver, id, self._dashboard_actions, instance_number, "_dashboard_actions (2nd)")
        self._execute_single_action_and_measure(driver, id, self._performance_operation_actions, instance_number, "_performance_operation_actions (1st)")
        self._execute_single_action_and_measure(driver, id, self._performance_operation_actions, instance_number, "_performance_operation_actions (2nd)")
        self._execute_single_action_and_measure(driver, id, self._log_analysis_actions, instance_number, "_log_analysis_actions (1st)")
        self._execute_single_action_and_measure(driver, id, self._log_analysis_actions, instance_number, "_log_analysis_actions (2nd)")

        if account_type == 1:
            print(f"Session {instance_number}, User: {id}: Admin actions not defined for this driver function.")
@@ -384,26 +411,32 @@ class MultipleTest(unittest.TestCase):
        print(f"Total time for Session {driver_instance_number}, User: {id} ({main_driver_func.__name__}): {end_time - start_time:.2f} seconds")
        time.sleep(1)


    def test_multiple(self):
        address = self.DC_address
        username = self.username
        password = self.password
        account_type = self.account_type
        duplicate_count = self.duplicate_count

        threads = []
        for i in range(duplicate_count):
            # Pass unique driver instance to each thread, and the chosen function
            thread = threading.Thread(target=self._run_single_driver_actions, args=(address, username, password, account_type, i + 1))
        # A unique number for each browser session instance for logging.
        driver_instance_number = 1

        # The outer loop iterates through each configuration (e.g., admin, manager, user).
        for config in self.account_configs:
            account_type = config["account_type"]
            username = config["username"]
            password = config["password"]
            duplicate_count = config["duplicate_count"]

            # The inner loop creates the specified number of threads for the current configuration.
            for _ in range(duplicate_count):
                thread = threading.Thread(target=self._run_single_driver_actions, args=(address, username, password, account_type, driver_instance_number))
                threads.append(thread)
                thread.start()
            # Add a small delay between launching threads to prevent immediate resource contention
                # Increment the instance number for the next session.
                driver_instance_number += 1
                time.sleep(0.5)

        for thread in threads:
            thread.join() # Wait for all threads to complete


    def is_element_present(self, how, what):
        try:
            self.driver.find_element(by=how, value=what)