Commit e4278302 authored by Watchtek's avatar Watchtek
Browse files

Merge branch 'dev' into 'main'

Add or change several things

See merge request watchtek-group/load-test!15
parents f9493f30 74f1db2b
Loading
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -36,6 +36,7 @@ variables:
      artifacts: false
  script:
    - echo "Creating artifact for $TARGET_OS"
    - export HOME=/tmp
    - pip install -r requirements.txt
    - pyinstaller --onefile ${BINARY_NAME}.py
    - export TS=$(date +%Y-%m-%d_%H%M)
+1 −1
Original line number Diff line number Diff line
@@ -3,7 +3,7 @@
# disable=too-many-instance-attributes, too-many-arguments, too-many-positional-arguments, too-many-locals
# Same with above
# Disable specific message IDs.
disable=R0902, R0913, R0915, R0917, R0914
disable=R0902, R0913, R0915, R0917, R0914, R0912

[FORMAT]
# Maximum number of characters on a single line.
+124 −46
Original line number Diff line number Diff line
@@ -23,8 +23,20 @@ class MultipleTest(unittest.TestCase):
    def _get_user_inputs(self):
        """Helper method to get user inputs and set up initial configuration."""
        # Test case user input
        while True:
            print("1. [필수 입력] Chrome 위치 (e.g., /opt/chrome-linux64/chrome, D:\\tmp\\chrome-win64\\chrome.exe): ")
        self.binary_location = input().strip()
            chrome_path = input().strip()

            if not chrome_path:
                print("잘못 입력하였습니다. 다시 입력해주세요.\n")
                continue

            if os.path.exists(chrome_path) and os.path.isfile(chrome_path):
                self.binary_location = chrome_path
                break

            print(f"입력한 경로가 존재하지 않거나 파일이 아닙니다: {chrome_path}")
            print("다시 입력해주세요.\n")

        print("\n2. [필수 입력] 테스트 대상 웹 (e.g., https://example.com:8443/): ")
        self.dc_address = input()
@@ -34,51 +46,42 @@ class MultipleTest(unittest.TestCase):
            print("EMS 접속이 원할하지 않습니다. 프로그램을 종료합니다")
            sys.exit(1)

        # A list is created to store multiple account configurations.
        self.account_configs = []
        # print("\n[필수 입력] 계정 정보를 한 줄씩 입력하세요. (포맷: 유형 / 아이디, 비번 / 세션 수)")
        print("\n3. [필수 입력] 계정유형(1-어드민,2-(지역)관리자,3-사용자) / 계정 접속정보 / 커넥션 비중")
        print("ex) 1 / admin, admin / 3")
        print("ex) 2 / watchall, watchall / 3")
        print("ex) 3 / monitor, monitor / 4")
        print("입력을 마치려면 빈 줄에서 엔터키를 누르세요.")

        # A loop reads each line of input until an empty line is entered.
        print("\n3. [필수 입력] 계정유형(1-어드민,2-(지역)관리자,3-사용자) / 계정 접속정보")
        print("형식 예시: 1 / admin, admin")

        while True:
            user_input = input("").strip()
            if not user_input:
                # Exit the loop if the input is empty.
                if not self.account_configs:
                    print("입력된 계정 정보가 없습니다. 프로그램을 종료합니다.")
                    sys.exit(1)
                break
            try:
                # Each line is parsed and appended to the list as a dictionary.
                parts = user_input.split(' / ')
                account_type = int(parts[0].strip())
                credentials = parts[1].split(',')
                username = credentials[0].strip()
                password = credentials[1].strip()
                duplicate_count = int(parts[2].strip())

                config = {
                    "account_type": account_type,
                    "username": username,
                    "password": password,
                    "duplicate_count": duplicate_count
                }
                config = self._parse_account_line(user_input)
                self.account_configs.append(config)
            except (ValueError, IndexError):
                print(f"잘못된 입력 형식입니다: '{user_input}'. 해당 줄은 무시됩니다.")
                # success -> stop re-prompting
                break
            except ValueError as e:
                print(f"입력 형식 오류: {e}. 다시 입력하세요.")

        # New inputs for connection management
        print("\n4. [선택] 테스트 유형(기본값: 일반접속): 일반 접속(1), 일시 접속(2), 부하 화면 접근(3)")
        print("\n4. [선택] 테스트 유형(기본값: 일반접속): 일반 접속(1), 대시보드 부하(2)")
        user_input = input().strip()

        if not user_input:
            # ✅ empty input → default
            print("입력이 비어있습니다. 기본값 '일반 접속(1)'으로 설정합니다.")
            self.function_to_execute = 1
        else:
            try:
            self.function_to_execute = int(input().strip())
                choice = int(user_input)
                # if choice in (1, 2):
                if choice == 1:
                    self.function_to_execute = choice
                else:
                    print("현재 스크립트는 일반 접속 방법만 지원합니다. '일반 접속(1)'으로 설정합니다.")
                    self.function_to_execute = 1
            except ValueError:
            print("잘못 입력니다. 기본값 '일반 접속(1)'으로 설정합니다.")
                print("잘못 입력하셨습니다. 기본값 '일반 접속(1)'으로 설정합니다.")
                self.function_to_execute = 1


        print("\n5. [선택] 커넥션 수 / 테스트 시간(분)(기본값: 30개 / 120분)")
        try:
            pool_input = input().strip().split('/')
@@ -103,6 +106,51 @@ class MultipleTest(unittest.TestCase):
        os.makedirs(self.screenshots_folder, exist_ok=True)
        self.screenshot_base_dir = self.screenshots_folder

        # Debug screenshot toggle:
        # - Environment variable SCREENSHOT_ALL=1 / true / y will enable all screenshots (non-interactive)
        # - Otherwise ask interactively (default No)
        env_flag = os.getenv("SCREENSHOT_ALL", "").lower()
        if env_flag in ("1", "true", "y", "yes"):
            self.screenshot_all = True
            print("[INFO] SCREENSHOT_ALL environment variable enabled: saving screenshots for ALL drivers (debug mode).")
        else:
            print("\n6. [선택] 모든 세션에서 스크린샷을 저장하시겠습니까? (y/N): ")
            ans = input().strip().lower()
            self.screenshot_all = ans in ("y", "yes")
            if self.screenshot_all:
                print("[INFO] Debug mode ON: saving screenshots for ALL drivers.")
            else:
                print("[INFO] Default mode: saving screenshots only for first and last drivers.")

    def _parse_account_line(self, user_input: str):
        """Parse a single account config line in the format:
           '<type> / username, password'
           Returns a dict on success or raises ValueError on bad format.
        """
        if not user_input or not user_input.strip():
            raise ValueError("다시 입력해 주십시요.")

        parts = user_input.split(' / ')
        if len(parts) < 2:
            raise ValueError("입력 형식이 잘못되었습니다. 예시: 1 / admin, admin")

        try:
            account_type = int(parts[0].strip())
            credentials = parts[1].split(',')
            if len(credentials) < 2:
                raise ValueError("아이디와 비밀번호를 쉼표로 구분하세요. 예시: admin, admin")
            username = credentials[0].strip()
            password = credentials[1].strip()
        except ValueError as e:
            # Preserve the original exception context for better debugging and satisfy pylint W0707
            raise ValueError(f"입력 형식 오류: {e}") from e

        return {
            "account_type": account_type,
            "username": username,
            "password": password,
        }

    def check_http_status(self, url: str, max_retry: int = 3, ignore_cert: bool = True) -> bool:
        """
        Checks the HTTP status of the given URL.
@@ -158,20 +206,46 @@ class MultipleTest(unittest.TestCase):
        self.accept_next_alert = True

    def _take_screenshot(self, driver, action_name):
        """Saves a screenshot with a unique, descriptive filename."""
        """Saves screenshots according to the screenshot_all flag.

        - If self.screenshot_all is True -> save for every driver.
        - Otherwise only save for driver.instance_number == 1 or == self.max_connections.
        """
        try:
            inst = getattr(driver, "instance_number", None)
            if inst is None:
                # No instance number set; skip silently
                return

            # If debug flag set -> save for everyone
            if getattr(self, "screenshot_all", False):
                do_save = True
            else:
                last_inst = getattr(self, "max_connections", None)
                # Default behavior: only save for instance 1 and planned last instance
                do_save = (inst == 1) or (last_inst is not None and inst == last_inst)

            if not do_save:
                return

            # proceed to save screenshot for allowed instances
            time.sleep(0.2)
            session_folder = os.path.join(self.screenshot_base_dir, f"session_{driver.instance_number}")
            session_folder = os.path.join(self.screenshot_base_dir, f"session_{inst}")
            os.makedirs(session_folder, exist_ok=True)
            counter = getattr(driver, 'screenshot_counter', 0) + 1
            setattr(driver, 'screenshot_counter', counter)

            counter = getattr(driver, "screenshot_counter", 0) + 1
            setattr(driver, "screenshot_counter", counter)
            counter_str = str(counter).zfill(4)
            filename = os.path.join(session_folder, f"{counter_str}_{action_name}.png")
            time.sleep(3) # Set common wait time before save screenshot(except custom dashboard)

            # allow page to settle (your previous behavior)
            time.sleep(3)
            driver.save_screenshot(filename)
            # optionally show debug
            # print(f"Screenshot saved: {filename}")
        except (FileNotFoundError, PermissionError):
            print(f"Error saving screenshot for session {driver.instance_number}")

        except (FileNotFoundError, PermissionError) as e:
            print(f"Error saving screenshot for session {getattr(driver, 'instance_number', 'unknown')}: {e}")

    def _interact_with_time_filters(self, driver):
        """
@@ -521,6 +595,10 @@ class MultipleTest(unittest.TestCase):
        # Guest Mode
        options.add_argument('--guest')
        options.add_argument("--window-size=1920,1080")
        headless = os.getenv("HEADLESS", "").lower()
        if headless in ("0", "false", "n", "no"):
            pass
        else:
            options.add_argument("--headless=new")
        # options.add_argument("--disable-gpu")
        options.binary_location = self.binary_location