Loading .gitlab-ci.yml +6 −0 Original line number Diff line number Diff line Loading @@ -7,6 +7,10 @@ include: - component: "$CI_SERVER_FQDN/to-be-continuous/python/gitlab-ci-python@7.11" inputs: pylint-enabled: true # Bash template - component: "$CI_SERVER_FQDN/to-be-continuous/bash/gitlab-ci-bash@3.9" inputs: shellcheck-files: "scripts/*.sh" # your pipeline stages stages: Loading Loading @@ -82,6 +86,8 @@ release_job: artifacts: true - job: linux_slim_bin artifacts: true - job: bash-shellcheck artifacts: false before_script: - apk add --no-cache curl script: Loading .pylintrc +3 −3 Original line number Diff line number Diff line [MESSAGES CONTROL] # Disable the messages 'too-many-instance-attributes' (R0902), 'too-many-arguments' (R0913), and 'too-many-positional-arguments' (R0917). # disable=too-many-instance-attributes, too-many-arguments, too-many-positional-arguments # Disable the messages 'too-many-instance-attributes' (R0902), 'too-many-arguments' (R0913), and 'too-many-positional-arguments' (R0917). + 'too-many-locals' (R0914) # disable=too-many-instance-attributes, too-many-arguments, too-many-positional-arguments, too-many-locals # Same with above # Disable specific message IDs. disable=R0902, R0913, R0915, R0917 disable=R0902, R0913, R0915, R0917, R0914 [FORMAT] # Maximum number of characters on a single line. Loading load_check.py +70 −53 Original line number Diff line number Diff line Loading @@ -75,22 +75,23 @@ class MultipleTest(unittest.TestCase): print("잘못된 입력입니다. 기본값 '일반 접속(1)'으로 설정합니다.") self.function_to_execute = 1 print("\n6. [선택] 연결 유지 시간(분)(기본값: 120)") try: self.keep_connection_time = int(input().strip()) * 60 except ValueError: print("잘못된 입력입니다. 기본값 '120분'으로 설정합니다.") self.keep_connection_time = 120 * 60 print("\n7. [선택] 유지 커넥션 개수 / 최대 커넥션 수 까지 도달 시간(분)(기본값: 50 / 30)") print("\n6. [선택] 커넥션 수 / 테스트 시간(분)(기본값: 30개 / 120분)") try: pool_input = input().strip().split('/') self.max_connections = int(pool_input[0].strip()) self.ramp_up_time = int(pool_input[1].strip()) * 60 self.max_connections = int(pool_input[0].strip()) if pool_input[0].strip() else 30 self.keep_connection_time = int(pool_input[1].strip()) * 60 if len(pool_input) > 1 else 120 * 60 # ✅ Validation: ensure test time is at least 2x connections min_required = self.max_connections * 2 if (self.keep_connection_time // 60) < min_required: print(f"[WARN] 테스트 시간({self.keep_connection_time//60}분)이 커넥션 수({self.max_connections})의 2배({min_required}분)보다 짧습니다.") print(f"[INFO] 테스트 시간을 {min_required}분으로 자동 조정합니다.") self.keep_connection_time = min_required * 60 except (ValueError, IndexError): print("잘못된 입력입니다. 기본값 '50개 / 30분'으로 설정합니다.") self.max_connections = 50 self.ramp_up_time = 30 * 60 print("잘못된 입력입니다. 기본값 '30개 / 120분'으로 설정합니다.") self.max_connections = 30 self.keep_connection_time = 120 * 60 # Make Screenshots_YYMMDD_HHmmss folder timestamp = datetime.now().strftime("%y%m%d_%H%M%S") Loading @@ -115,6 +116,7 @@ class MultipleTest(unittest.TestCase): setattr(driver, 'screenshot_counter', counter) counter_str = str(counter).zfill(4) filename = os.path.join(session_folder, f"{counter_str}_{action_name}.png") time.sleep(3) # Set common wait time before save screenshot(except custom dashboard) driver.save_screenshot(filename) # print(f"Screenshot saved: {filename}") except (FileNotFoundError, PermissionError): Loading Loading @@ -199,7 +201,6 @@ class MultipleTest(unittest.TestCase): """Performs the login action.""" driver.get(address) self._take_screenshot(driver, "page_load") time.sleep(0.5) wait.until(EC.element_to_be_clickable((By.XPATH, "//input[@type='text']"))).click() driver.find_element(By.XPATH, "//input[@type='text']").clear() Loading @@ -209,7 +210,6 @@ class MultipleTest(unittest.TestCase): self._take_screenshot(driver, "credentials_entered") driver.find_element(By.XPATH, "//button[@type='button']").click() self._take_screenshot(driver, "login_button_click") time.sleep(0.5) def _dashboard_actions(self, driver): """Performs actions on the Dashboard.""" Loading @@ -218,14 +218,11 @@ class MultipleTest(unittest.TestCase): # Default dashbaord self._take_screenshot(driver, "dashboard_menu_click") driver.find_element(By.XPATH, "//div[@id='wrapper']/div/div/aside/div/div/div/div/article/div/div/div/div/div/div/div/div[2]/div/div/span[2]/span/span").click() time.sleep(1.5) self._take_screenshot(driver, "dashboard_default_first_module_click") driver.find_element(By.XPATH, "//div[@id='wrapper']/div/div/aside/div/div/div/div/article/div/div/div/div/div/div/div/div[2]/div[2]/div/span[2]/span/span").click() time.sleep(1.5) self._take_screenshot(driver, "dashboard_default_second_module_click") try: wait.until(EC.element_to_be_clickable((By.XPATH, "//div[@id='wrapper']/div/div/aside/div/div/div/div/article/div/div/div/div/div/div/div/div[2]/div[3]/div/span[2]/span/span"))).click() time.sleep(1.5) self._take_screenshot(driver, "dashboard_default_third_module_click") except (TimeoutException, NoSuchElementException): print("There is no third modules") Loading @@ -250,15 +247,12 @@ class MultipleTest(unittest.TestCase): driver.find_element(By.XPATH, "//div[@id='wrapper']/nav/ul/li[2]/a/div").click() self._take_screenshot(driver, "topology_map_menu_click") driver.find_element(By.XPATH, "//div[@id='wrapper']/div/div/aside/div/div/div/article/div/div/div/div/div/div/span[2]/span/span[2]").click() time.sleep(3) self._take_screenshot(driver, "topology_first_parent_element_click") try: wait.until(EC.element_to_be_clickable((By.XPATH, "//div[@id='wrapper']/div/div/aside/div/div/div/article/div/div/div/div/div/div[2]/div/div/span[2]/span/span[2]"))).click() time.sleep(3) self._take_screenshot(driver, "topology_child_element_click") except (TimeoutException, NoSuchElementException): driver.find_element(By.XPATH, "//div[@id='wrapper']/div/div/aside/div/div/div/article/div/div/div/div/div[2]/div/span[2]/span/span[2]").click() time.sleep(3) self._take_screenshot(driver, "topology_second_parent_element_click") def _performance_operation_actions(self, driver): Loading Loading @@ -308,8 +302,8 @@ class MultipleTest(unittest.TestCase): time.sleep(0.5) driver.find_element(By.XPATH, "//div[@id='wrapper']/nav/ul/li[5]/a/div").click() self._take_screenshot(driver, "log_analysis_menu_click") driver.find_element(By.XPATH, "//div[@id='wrapper']/div/div/aside/div/div/div/div/article/div/div/div/div/div/div/span[2]/span/span").click() self._take_screenshot(driver, "log_analysis_element_click") # driver.find_element(By.XPATH, "//div[@id='wrapper']/div/div/aside/div/div/div/div/article/div/div/div/div/div/div/span[2]/span/span").click() # self._take_screenshot(driver, "log_analysis_element_click") self._interact_with_time_filters_for_la(driver) def _event_status_actions(self, driver): Loading Loading @@ -455,7 +449,7 @@ class MultipleTest(unittest.TestCase): elif account_type == 3: print(f"Session {driver.instance_number}, User: {driver.user_id}: User account - no specific admin actions.") def _run_single_driver_actions(self, address, user_id, pw, account_type, driver_instance_number): def _run_single_driver_actions(self, address, user_id, pw, account_type, driver_instance_number, end_time_global): """Sets up and runs a single driver instance's actions.""" # Create a new driver instance for this thread options = webdriver.ChromeOptions() Loading @@ -480,16 +474,14 @@ class MultipleTest(unittest.TestCase): driver.implicitly_wait(10) # Common implicit wait for this driver self.drivers.append(driver) # Add this driver to the list for tearDown # Start the timer for the connection start_time = time.time() # Log in once at the start of the session wait = WebDriverWait(driver, 10) self._execute_single_action_and_measure(driver, self._login_action, "_login_action", address, user_id, pw, wait) print(f"Starting looped actions for session {driver_instance_number} with user: {user_id}") # Loop actions until the keep connection time is up while time.time() - start_time < self.keep_connection_time: # ✅ Loop until the global end time (all threads stop together) while time.time() < end_time_global: if self.function_to_execute == 1: self._execute_and_measure_actions(driver, account_type, self._run_driver1_actions) elif self.function_to_execute == 2: Loading @@ -497,7 +489,8 @@ class MultipleTest(unittest.TestCase): elif self.function_to_execute == 3: self._execute_and_measure_actions(driver, account_type, self._run_driver3_actions) print(f"Session {driver_instance_number} with user {user_id} finished its action loop.") print(f"Session {driver_instance_number} with user {user_id} finished at global stop time.") def _execute_and_measure_actions(self, driver, account_type, main_driver_func): """Executes the main driver function and measures its duration.""" Loading @@ -509,40 +502,64 @@ class MultipleTest(unittest.TestCase): def test_multiple(self): """Main test method to start multiple threads for load testing.""" address = self.dc_address threads_to_start = [] # Calculate the total number of connections to be made total_connections_to_start = sum(config["duplicate_count"] for config in self.account_configs) # Determine the number of connections to start per second to meet the ramp-up time if self.ramp_up_time > 0: threads_per_second = total_connections_to_start / self.ramp_up_time sleep_time_per_thread = 1.0 / threads_per_second if threads_per_second > 0 else 0 else: sleep_time_per_thread = 0 driver_instance_number = 1 # Create a list of all thread arguments to be started for config in self.account_configs: # Use max_connections from input total_connections_to_start = self.max_connections # Just take the first account config for all threads config = self.account_configs[0] account_type = config["account_type"] username = config["username"] password = config["password"] duplicate_count = config["duplicate_count"] for _ in range(duplicate_count): # Build the thread args list for driver_instance_number in range(1, total_connections_to_start + 1): threads_to_start.append((address, username, password, account_type, driver_instance_number)) driver_instance_number += 1 # Shuffle the list to randomize the order of connections # Shuffle the list (optional randomization) random.shuffle(threads_to_start) active_threads = [] # Ramp up the connections linearly over time for args in threads_to_start: thread = threading.Thread(target=self._run_single_driver_actions, args=args) start_time = datetime.now() end_time_global = time.time() + self.keep_connection_time end_time_dt = datetime.fromtimestamp(end_time_global) print(f"[INFO] Test started at {start_time.strftime('%H:%M:%S')} with {total_connections_to_start} planned threads") print(f"[INFO] Global stop time set to {end_time_dt.strftime('%H:%M:%S')}") # 🚀 Start first thread immediately, then one new thread every minute for i, args in enumerate(threads_to_start, start=1): # Pass the global end time into each thread thread = threading.Thread( target=self._run_single_driver_actions, args=(*args, end_time_global) ) active_threads.append(thread) thread.start() if sleep_time_per_thread > 0: time.sleep(sleep_time_per_thread) # Print status print( f"[INFO] Started thread #{i} at {datetime.now().strftime('%H:%M:%S')} " f"(Active threads: {len(active_threads)})" ) time.sleep(60) # wait 1 minute before starting the next thread # Wait for all threads to complete their timed action loops for thread in active_threads: thread.join() end_time = datetime.now() duration = (end_time - start_time).total_seconds() / 60.0 # minutes print( f"[SUMMARY] All {total_connections_to_start} threads stopped together at global stop time " f"{end_time_dt.strftime('%H:%M:%S')} (Total run: {duration:.1f} minutes)" ) def is_element_present(self, how, what): """Checks if an element is present on the page.""" try: Loading scripts/release.sh 100644 → 100755 +16 −7 Original line number Diff line number Diff line Loading @@ -21,11 +21,20 @@ SHORT_TAG="$CI_COMMIT_SHORT_SHA" BRANCH="$CI_COMMIT_BRANCH" # ✅ Map default branch (main/master) to "latest" if [ "$CI_COMMIT_BRANCH" = "$CI_DEFAULT_BRANCH" ]; then case "$CI_COMMIT_BRANCH" in "$CI_DEFAULT_BRANCH") BRANCH_TAG="latest" else BRANCH_TAG="${CI_COMMIT_REF_NAME}" fi ;; *dev*|*test*|*ci*|*pre*|*production*) case "$CI_COMMIT_BRANCH" in *-ci) BRANCH_TAG="$CI_COMMIT_BRANCH" ;; *) BRANCH_TAG="${CI_COMMIT_BRANCH}-ci" ;; esac ;; *) BRANCH_TAG="$CI_COMMIT_REF_NAME" ;; esac echo "📦 Publishing Release for $CI_PROJECT_PATH @ $SHORT_TAG ($BRANCH → $BRANCH_TAG)" Loading @@ -33,9 +42,9 @@ echo "📦 Publishing Release for $CI_PROJECT_PATH @ $SHORT_TAG ($BRANCH → $BR mkdir -p release_files echo "🔍 Collecting build artifacts for commit $CI_COMMIT_SHORT_SHA ..." cp *_${CI_COMMIT_SHORT_SHA}_* release_files/ 2>/dev/null || true cp ./*_"${CI_COMMIT_SHORT_SHA}"_* release_files/ 2>/dev/null || true if [ -z "$(ls -A release_files)" ]; then if [ -z "$(ls -A release_files 2>/dev/null)" ]; then echo "❌ No artifacts found matching pattern '*_${CI_COMMIT_SHORT_SHA}_*'" exit 1 fi Loading Loading
.gitlab-ci.yml +6 −0 Original line number Diff line number Diff line Loading @@ -7,6 +7,10 @@ include: - component: "$CI_SERVER_FQDN/to-be-continuous/python/gitlab-ci-python@7.11" inputs: pylint-enabled: true # Bash template - component: "$CI_SERVER_FQDN/to-be-continuous/bash/gitlab-ci-bash@3.9" inputs: shellcheck-files: "scripts/*.sh" # your pipeline stages stages: Loading Loading @@ -82,6 +86,8 @@ release_job: artifacts: true - job: linux_slim_bin artifacts: true - job: bash-shellcheck artifacts: false before_script: - apk add --no-cache curl script: Loading
.pylintrc +3 −3 Original line number Diff line number Diff line [MESSAGES CONTROL] # Disable the messages 'too-many-instance-attributes' (R0902), 'too-many-arguments' (R0913), and 'too-many-positional-arguments' (R0917). # disable=too-many-instance-attributes, too-many-arguments, too-many-positional-arguments # Disable the messages 'too-many-instance-attributes' (R0902), 'too-many-arguments' (R0913), and 'too-many-positional-arguments' (R0917). + 'too-many-locals' (R0914) # disable=too-many-instance-attributes, too-many-arguments, too-many-positional-arguments, too-many-locals # Same with above # Disable specific message IDs. disable=R0902, R0913, R0915, R0917 disable=R0902, R0913, R0915, R0917, R0914 [FORMAT] # Maximum number of characters on a single line. Loading
load_check.py +70 −53 Original line number Diff line number Diff line Loading @@ -75,22 +75,23 @@ class MultipleTest(unittest.TestCase): print("잘못된 입력입니다. 기본값 '일반 접속(1)'으로 설정합니다.") self.function_to_execute = 1 print("\n6. [선택] 연결 유지 시간(분)(기본값: 120)") try: self.keep_connection_time = int(input().strip()) * 60 except ValueError: print("잘못된 입력입니다. 기본값 '120분'으로 설정합니다.") self.keep_connection_time = 120 * 60 print("\n7. [선택] 유지 커넥션 개수 / 최대 커넥션 수 까지 도달 시간(분)(기본값: 50 / 30)") print("\n6. [선택] 커넥션 수 / 테스트 시간(분)(기본값: 30개 / 120분)") try: pool_input = input().strip().split('/') self.max_connections = int(pool_input[0].strip()) self.ramp_up_time = int(pool_input[1].strip()) * 60 self.max_connections = int(pool_input[0].strip()) if pool_input[0].strip() else 30 self.keep_connection_time = int(pool_input[1].strip()) * 60 if len(pool_input) > 1 else 120 * 60 # ✅ Validation: ensure test time is at least 2x connections min_required = self.max_connections * 2 if (self.keep_connection_time // 60) < min_required: print(f"[WARN] 테스트 시간({self.keep_connection_time//60}분)이 커넥션 수({self.max_connections})의 2배({min_required}분)보다 짧습니다.") print(f"[INFO] 테스트 시간을 {min_required}분으로 자동 조정합니다.") self.keep_connection_time = min_required * 60 except (ValueError, IndexError): print("잘못된 입력입니다. 기본값 '50개 / 30분'으로 설정합니다.") self.max_connections = 50 self.ramp_up_time = 30 * 60 print("잘못된 입력입니다. 기본값 '30개 / 120분'으로 설정합니다.") self.max_connections = 30 self.keep_connection_time = 120 * 60 # Make Screenshots_YYMMDD_HHmmss folder timestamp = datetime.now().strftime("%y%m%d_%H%M%S") Loading @@ -115,6 +116,7 @@ class MultipleTest(unittest.TestCase): setattr(driver, 'screenshot_counter', counter) counter_str = str(counter).zfill(4) filename = os.path.join(session_folder, f"{counter_str}_{action_name}.png") time.sleep(3) # Set common wait time before save screenshot(except custom dashboard) driver.save_screenshot(filename) # print(f"Screenshot saved: {filename}") except (FileNotFoundError, PermissionError): Loading Loading @@ -199,7 +201,6 @@ class MultipleTest(unittest.TestCase): """Performs the login action.""" driver.get(address) self._take_screenshot(driver, "page_load") time.sleep(0.5) wait.until(EC.element_to_be_clickable((By.XPATH, "//input[@type='text']"))).click() driver.find_element(By.XPATH, "//input[@type='text']").clear() Loading @@ -209,7 +210,6 @@ class MultipleTest(unittest.TestCase): self._take_screenshot(driver, "credentials_entered") driver.find_element(By.XPATH, "//button[@type='button']").click() self._take_screenshot(driver, "login_button_click") time.sleep(0.5) def _dashboard_actions(self, driver): """Performs actions on the Dashboard.""" Loading @@ -218,14 +218,11 @@ class MultipleTest(unittest.TestCase): # Default dashbaord self._take_screenshot(driver, "dashboard_menu_click") driver.find_element(By.XPATH, "//div[@id='wrapper']/div/div/aside/div/div/div/div/article/div/div/div/div/div/div/div/div[2]/div/div/span[2]/span/span").click() time.sleep(1.5) self._take_screenshot(driver, "dashboard_default_first_module_click") driver.find_element(By.XPATH, "//div[@id='wrapper']/div/div/aside/div/div/div/div/article/div/div/div/div/div/div/div/div[2]/div[2]/div/span[2]/span/span").click() time.sleep(1.5) self._take_screenshot(driver, "dashboard_default_second_module_click") try: wait.until(EC.element_to_be_clickable((By.XPATH, "//div[@id='wrapper']/div/div/aside/div/div/div/div/article/div/div/div/div/div/div/div/div[2]/div[3]/div/span[2]/span/span"))).click() time.sleep(1.5) self._take_screenshot(driver, "dashboard_default_third_module_click") except (TimeoutException, NoSuchElementException): print("There is no third modules") Loading @@ -250,15 +247,12 @@ class MultipleTest(unittest.TestCase): driver.find_element(By.XPATH, "//div[@id='wrapper']/nav/ul/li[2]/a/div").click() self._take_screenshot(driver, "topology_map_menu_click") driver.find_element(By.XPATH, "//div[@id='wrapper']/div/div/aside/div/div/div/article/div/div/div/div/div/div/span[2]/span/span[2]").click() time.sleep(3) self._take_screenshot(driver, "topology_first_parent_element_click") try: wait.until(EC.element_to_be_clickable((By.XPATH, "//div[@id='wrapper']/div/div/aside/div/div/div/article/div/div/div/div/div/div[2]/div/div/span[2]/span/span[2]"))).click() time.sleep(3) self._take_screenshot(driver, "topology_child_element_click") except (TimeoutException, NoSuchElementException): driver.find_element(By.XPATH, "//div[@id='wrapper']/div/div/aside/div/div/div/article/div/div/div/div/div[2]/div/span[2]/span/span[2]").click() time.sleep(3) self._take_screenshot(driver, "topology_second_parent_element_click") def _performance_operation_actions(self, driver): Loading Loading @@ -308,8 +302,8 @@ class MultipleTest(unittest.TestCase): time.sleep(0.5) driver.find_element(By.XPATH, "//div[@id='wrapper']/nav/ul/li[5]/a/div").click() self._take_screenshot(driver, "log_analysis_menu_click") driver.find_element(By.XPATH, "//div[@id='wrapper']/div/div/aside/div/div/div/div/article/div/div/div/div/div/div/span[2]/span/span").click() self._take_screenshot(driver, "log_analysis_element_click") # driver.find_element(By.XPATH, "//div[@id='wrapper']/div/div/aside/div/div/div/div/article/div/div/div/div/div/div/span[2]/span/span").click() # self._take_screenshot(driver, "log_analysis_element_click") self._interact_with_time_filters_for_la(driver) def _event_status_actions(self, driver): Loading Loading @@ -455,7 +449,7 @@ class MultipleTest(unittest.TestCase): elif account_type == 3: print(f"Session {driver.instance_number}, User: {driver.user_id}: User account - no specific admin actions.") def _run_single_driver_actions(self, address, user_id, pw, account_type, driver_instance_number): def _run_single_driver_actions(self, address, user_id, pw, account_type, driver_instance_number, end_time_global): """Sets up and runs a single driver instance's actions.""" # Create a new driver instance for this thread options = webdriver.ChromeOptions() Loading @@ -480,16 +474,14 @@ class MultipleTest(unittest.TestCase): driver.implicitly_wait(10) # Common implicit wait for this driver self.drivers.append(driver) # Add this driver to the list for tearDown # Start the timer for the connection start_time = time.time() # Log in once at the start of the session wait = WebDriverWait(driver, 10) self._execute_single_action_and_measure(driver, self._login_action, "_login_action", address, user_id, pw, wait) print(f"Starting looped actions for session {driver_instance_number} with user: {user_id}") # Loop actions until the keep connection time is up while time.time() - start_time < self.keep_connection_time: # ✅ Loop until the global end time (all threads stop together) while time.time() < end_time_global: if self.function_to_execute == 1: self._execute_and_measure_actions(driver, account_type, self._run_driver1_actions) elif self.function_to_execute == 2: Loading @@ -497,7 +489,8 @@ class MultipleTest(unittest.TestCase): elif self.function_to_execute == 3: self._execute_and_measure_actions(driver, account_type, self._run_driver3_actions) print(f"Session {driver_instance_number} with user {user_id} finished its action loop.") print(f"Session {driver_instance_number} with user {user_id} finished at global stop time.") def _execute_and_measure_actions(self, driver, account_type, main_driver_func): """Executes the main driver function and measures its duration.""" Loading @@ -509,40 +502,64 @@ class MultipleTest(unittest.TestCase): def test_multiple(self): """Main test method to start multiple threads for load testing.""" address = self.dc_address threads_to_start = [] # Calculate the total number of connections to be made total_connections_to_start = sum(config["duplicate_count"] for config in self.account_configs) # Determine the number of connections to start per second to meet the ramp-up time if self.ramp_up_time > 0: threads_per_second = total_connections_to_start / self.ramp_up_time sleep_time_per_thread = 1.0 / threads_per_second if threads_per_second > 0 else 0 else: sleep_time_per_thread = 0 driver_instance_number = 1 # Create a list of all thread arguments to be started for config in self.account_configs: # Use max_connections from input total_connections_to_start = self.max_connections # Just take the first account config for all threads config = self.account_configs[0] account_type = config["account_type"] username = config["username"] password = config["password"] duplicate_count = config["duplicate_count"] for _ in range(duplicate_count): # Build the thread args list for driver_instance_number in range(1, total_connections_to_start + 1): threads_to_start.append((address, username, password, account_type, driver_instance_number)) driver_instance_number += 1 # Shuffle the list to randomize the order of connections # Shuffle the list (optional randomization) random.shuffle(threads_to_start) active_threads = [] # Ramp up the connections linearly over time for args in threads_to_start: thread = threading.Thread(target=self._run_single_driver_actions, args=args) start_time = datetime.now() end_time_global = time.time() + self.keep_connection_time end_time_dt = datetime.fromtimestamp(end_time_global) print(f"[INFO] Test started at {start_time.strftime('%H:%M:%S')} with {total_connections_to_start} planned threads") print(f"[INFO] Global stop time set to {end_time_dt.strftime('%H:%M:%S')}") # 🚀 Start first thread immediately, then one new thread every minute for i, args in enumerate(threads_to_start, start=1): # Pass the global end time into each thread thread = threading.Thread( target=self._run_single_driver_actions, args=(*args, end_time_global) ) active_threads.append(thread) thread.start() if sleep_time_per_thread > 0: time.sleep(sleep_time_per_thread) # Print status print( f"[INFO] Started thread #{i} at {datetime.now().strftime('%H:%M:%S')} " f"(Active threads: {len(active_threads)})" ) time.sleep(60) # wait 1 minute before starting the next thread # Wait for all threads to complete their timed action loops for thread in active_threads: thread.join() end_time = datetime.now() duration = (end_time - start_time).total_seconds() / 60.0 # minutes print( f"[SUMMARY] All {total_connections_to_start} threads stopped together at global stop time " f"{end_time_dt.strftime('%H:%M:%S')} (Total run: {duration:.1f} minutes)" ) def is_element_present(self, how, what): """Checks if an element is present on the page.""" try: Loading
scripts/release.sh 100644 → 100755 +16 −7 Original line number Diff line number Diff line Loading @@ -21,11 +21,20 @@ SHORT_TAG="$CI_COMMIT_SHORT_SHA" BRANCH="$CI_COMMIT_BRANCH" # ✅ Map default branch (main/master) to "latest" if [ "$CI_COMMIT_BRANCH" = "$CI_DEFAULT_BRANCH" ]; then case "$CI_COMMIT_BRANCH" in "$CI_DEFAULT_BRANCH") BRANCH_TAG="latest" else BRANCH_TAG="${CI_COMMIT_REF_NAME}" fi ;; *dev*|*test*|*ci*|*pre*|*production*) case "$CI_COMMIT_BRANCH" in *-ci) BRANCH_TAG="$CI_COMMIT_BRANCH" ;; *) BRANCH_TAG="${CI_COMMIT_BRANCH}-ci" ;; esac ;; *) BRANCH_TAG="$CI_COMMIT_REF_NAME" ;; esac echo "📦 Publishing Release for $CI_PROJECT_PATH @ $SHORT_TAG ($BRANCH → $BRANCH_TAG)" Loading @@ -33,9 +42,9 @@ echo "📦 Publishing Release for $CI_PROJECT_PATH @ $SHORT_TAG ($BRANCH → $BR mkdir -p release_files echo "🔍 Collecting build artifacts for commit $CI_COMMIT_SHORT_SHA ..." cp *_${CI_COMMIT_SHORT_SHA}_* release_files/ 2>/dev/null || true cp ./*_"${CI_COMMIT_SHORT_SHA}"_* release_files/ 2>/dev/null || true if [ -z "$(ls -A release_files)" ]; then if [ -z "$(ls -A release_files 2>/dev/null)" ]; then echo "❌ No artifacts found matching pattern '*_${CI_COMMIT_SHORT_SHA}_*'" exit 1 fi Loading