Commit 5b3c0763 authored by Sergey Pinus's avatar Sergey Pinus
Browse files

Fix deeplx

parent cdb1ebfb
Loading
Loading
Loading
Loading
+119 −130
Original line number Diff line number Diff line
@@ -33,104 +33,82 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""

from .base import *
# import signal

# class TimeoutException(Exception):
#     pass

# def timeout_handler(signum, frame):
#     raise TimeoutException("Timed out!")

import random
import time
import json
import httpx
from langdetect import detect
import brotli
import gzip  # Import gzip module for handling gzip compression
import gzip
import re
from typing import Dict, List

from modules.translators.base import BaseTranslator, register_translator
from utils.logger import logger as LOGGER


deeplAPI_base = "https://www2.deepl.com/jsonrpc" # Base URL for DeepL API
deepl_client_params = "client=chrome-extension,1.28.0" # Client parameters as used in Chrome extension v1.28.0
headers = {  # Headers, simplified and aligned with deepx.py
    'Accept': '*/*',
    'Accept-Language': 'en-US,en;q=0.9',  # Simplified Accept-Language like in deepx.py
    'Accept-Encoding': 'gzip, deflate, br',  # Accept-Encoding added as in deepx.py
    'Authorization': 'None',
    'Cache-Control': 'no-cache',
    'Content-Type': 'application/json',
    'DNT': '1',
    'Origin': 'chrome-extension://cofdbpoegempjloogbagkncekinflcnj',
    'Pragma': 'no-cache',
    'Priority': 'u=1, i',
    'Referer': 'https://www.deepl.com/',
    'Sec-Fetch-Dest': 'empty',
    'Sec-Fetch-Mode': 'cors',
    'Sec-Fetch-Site': 'none',
    'Sec-GPC': '1',
    'User-Agent': 'DeepLBrowserExtension/1.28.0 Mozilla/5.0 (Windows NT 10.0; Win64; x64)',  # User-Agent like in deepx.py
deeplAPI_base = "https://www2.deepl.com/jsonrpc"
deepl_client_params = "client=chrome-extension,1.28.0"
headers = {
    "Content-Type":     "application/json",
    "User-Agent":       "DeepL/1627620 CFNetwork/3826.500.62.2.1 Darwin/24.4.0",
    "Accept":           "*/*",
    "X-App-Os-Name":    "iOS",
    "X-App-Os-Version": "18.4.0",
    "Accept-Language":  "en-US,en;q=0.9",
    "Accept-Encoding":  "gzip, deflate, br",
    "X-App-Device":     "iPhone16,2",
    "Referer":          "https://www.deepl.com/",
    "X-Product":        "translator",
    "X-App-Build":      "1627620",
    "X-App-Version":    "25.1",
}


class TooManyRequestsException(Exception):
    "Raised when there is a 429 error"

    def __str__(self):
        return "Error: Too many requests, your IP has been blocked by DeepL temporarily, please don't request it frequently in a short time."


def detectLang(translateText) -> str:
    """Detects the language of the text using langdetect."""
    try: #  Error handling for cases when langdetect cannot determine the language
def detectLang(translateText: str) -> str:
    try:
        language = detect(translateText)
        return language.upper()
    except:
        return "EN" # Default language or alternative error handling
        return "EN"


def getICount(translateText) -> int:
    """Counts the number of 'i' characters in the text, used in DeepL request."""
def getICount(translateText: str) -> int:
    return translateText.count("i")


def getRandomNumber() -> int:
    """Generates a random number used as request ID, similar to deepx.py."""
    src = random.Random(time.time()) # Random initialization as in deepx.py
    num = src.randint(8300000, 8399999) # Range as in deepx.py
    src = random.Random(time.time())
    num = src.randint(8300000, 8399999)
    return num * 1000


def getTimestamp(iCount: int) -> int:
    """Generates a timestamp used in DeepL request, based on 'i' count."""
    ts = int(time.time() * 1000)

    if iCount == 0:
        return ts

    iCount += 1
    return ts - ts % iCount + iCount

def format_post_data(post_data_dict, id_val):
    """Formats post data string with specific spacing for 'method' key."""
    post_data_str = json.dumps(post_data_dict, ensure_ascii=False)
    if (id_val + 5) % 29 == 0 or (id_val + 3) % 13 == 0:
        post_data_str = post_data_str.replace('"method":"', '"method" : "', 1) # Replace only once
        post_data_str = post_data_str.replace('"method":"', '"method" : "', 1)
    else:
        post_data_str = post_data_str.replace('"method":"', '"method": "', 1) # Replace only once
        post_data_str = post_data_str.replace('"method":"', '"method": "', 1)
    return post_data_str

def is_richtext(text: str) -> bool:
    """Checks if the text contains HTML-like tags."""
    return bool(re.search(r'<[^>]+>', text))

def deepl_split_text(text: str, tag_handling: bool = None, proxies=None) -> dict:
    """Sends request to DeepL API to split text before translation."""
    source_lang = 'auto'
    text_type = 'richtext' if (tag_handling or is_richtext(text)) else 'plaintext' # Uses is_richtext and tag_handling
    text_type = 'richtext' if (tag_handling or is_richtext(text)) else 'plaintext'
    postData = {
        "jsonrpc": "2.0",
        "method": "LMT_split_text",
@@ -146,42 +124,42 @@ def deepl_split_text(text: str, tag_handling: bool = None, proxies=None) -> dict
        },
        "id": getRandomNumber()
    }
    postDataStr = format_post_data(postData, getRandomNumber()) # Uses getRandomNumber for ID
    url = f"{deeplAPI_base}?{deepl_client_params}&method=LMT_split_text" # URL as in deepx.py
    postDataStr = format_post_data(postData, getRandomNumber())
    url = f"{deeplAPI_base}?{deepl_client_params}&method=LMT_split_text"
    return make_deepl_request(url, postDataStr, proxies)


def make_deepl_request(url, postDataStr, proxies):
    """Makes a request to DeepL API, handles proxies, decompression, and errors."""
    client = httpx.Client(headers=headers, proxy=proxies, timeout=30) # Proxy setup as in deepx.py, timeout added
    client = httpx.Client(headers=headers, proxy=proxies, timeout=30, verify=False)
    try:
        resp = client.post(url=url, content=postDataStr) # Sends content instead of data
        if not resp.is_success: # Checks resp.is_success instead of respStatusCode
            return {'error': resp.text} # Returns error dict as in deepx.py
        LOGGER.debug(f"Request JSON: {postDataStr}") # Логируем JSON перед отправкой
        resp = client.post(url=url, content=postDataStr)
        if not resp.is_success:
            LOGGER.error(f"Request failed with status code: {resp.status_code}, response text: {resp.text}") # Логируем ошибку запроса
            return {'error': resp.text}
        try:
            return resp.json()
        except json.JSONDecodeError:
            try:
            return resp.json() # Tries to parse JSON
        except json.JSONDecodeError: # Handles JSONDecodeError
            try:  # Attempts gzip decompression if brotli fails, as in deepx.py
                return json.loads(gzip.decompress(resp.content))
            except Exception:
                try:
                    return resp.json()  # Tries to parse JSON again (in case it's not gzip)
                    return resp.json()
                except:
                    try:
                        return json.loads(brotli.decompress(resp.content)) # Brotli decompression as in deepx.py
                        return json.loads(brotli.decompress(resp.content))
                    except Exception as e:
                        LOGGER.error(f"Decompression error: {e}, content: {resp.content[:100]}") # Logs decompression error
                        return {'error': 'Failed to decompress response'} # Returns decompression error
                        LOGGER.error(f"Decompression error: {e}, content: {resp.content[:100]}")
                        return {'error': 'Failed to decompress response'}

    except httpx.HTTPError as e: # Catches httpx errors (timeouts, connection errors, etc.)
        LOGGER.error(f"HTTPError: {e}") # Logs HTTP errors
        LOGGER.error(f"Request URL: {url}") # Logs request URL
        LOGGER.error(f"Request Data: {postDataStr}") # Logs request data
        return {'error': str(e)} # Returns error dict for httpx errors
    except httpx.HTTPError as e:
        LOGGER.error(f"HTTPError: {e}")
        LOGGER.error(f"Request URL: {url}")
        LOGGER.error(f"Request Data: {postDataStr}")
        return {'error': str(e)}


def deepl_response_to_deeplx(data: dict) -> dict:
    """Transforms DeepL API response to DeepLX format, including alternatives."""
    alternatives = []
    if 'result' in data and 'translations' in data['result'] and len(data['result']['translations']) > 0:
        num_beams = len(data['result']['translations'][0].get('beams', []))
@@ -216,40 +194,39 @@ def translate_core(
    sourceLang,
    targetLang,
    tagHandling,
    dl_session = "", # dl_session for Pro API, not used in this free version
    dl_session = "",
    proxies=None,
):
    """Core translation function, orchestrates split text and handle jobs requests."""
    if not text:
        return {"code": 404, "message": "No text to translate"}

    split_result_json = deepl_split_text(text, tagHandling in ("html", "xml"), proxies) # tag_handling_bool is calculated here, using deepl_split_text
    if 'error' in split_result_json: # Error check from deepl_split_text
        return {"code": 503, "message": split_result_json['error']} # 503 Service Unavailable, returns error message
    split_result_json = deepl_split_text(text, tagHandling in ("html", "xml"), proxies)
    if 'error' in split_result_json:
        return {"code": 503, "message": split_result_json['error']}

    if sourceLang == "auto" or not sourceLang: # Language detection if sourceLang is auto or not provided
    if sourceLang == "auto" or not sourceLang:
        sourceLang_detected = split_result_json.get("result", {}).get("lang", {}).get("detected")
        if sourceLang_detected:
            sourceLang = sourceLang_detected.lower() # tolower() as in deepx.py
            sourceLang = sourceLang_detected.lower()
        else:
            sourceLang = detectLang(text).lower() # tolower() and fallback to langdetect
            sourceLang = detectLang(text).lower()

    i_count = getICount(text) # getICount
    i_count = getICount(text)

    jobs = []
    try: # try-except for accessing chunks as in deepx.py
    try:
        chunks = split_result_json['result']['texts'][0]['chunks']
    except (KeyError, IndexError, TypeError): # TypeError added for robustness
        return {'code': 503, 'message': 'Unexpected response structure from split_text'} # Returns error if split_text response structure is incorrect
    except (KeyError, IndexError, TypeError):
        return {'code': 503, 'message': 'Unexpected response structure from split_text'}

    for idx, chunk in enumerate(chunks):
        sentence = chunk['sentences'][0] # sentence as in deepx.py
        context_before = [chunks[idx-1]['sentences'][0]['text']] if idx > 0 else [] # context_before as in deepx.py
        context_after = [chunks[idx+1]['sentences'][0]['text']] if idx < len(chunks) - 1 else [] # context_after as in deepx.py
        sentence = chunk['sentences'][0]
        context_before = [chunks[idx-1]['sentences'][0]['text']] if idx > 0 else []
        context_after = [chunks[idx+1]['sentences'][0]['text']] if idx < len(chunks) - 1 else []

        jobs.append({ # job as in deepx.py
        jobs.append({
            "kind": "default",
            "preferred_num_beams": 4, # preferred_num_beams = 4 as in deepx.py
            "preferred_num_beams": 4,
            "raw_en_context_before": context_before,
            "raw_en_context_after": context_after,
            "sentences": [{
@@ -260,76 +237,87 @@ def translate_core(
        })


    targetLang_code = targetLang.upper() # targetLang_code to upper
    postData = { # postData for LMT_handle_jobs as in deepx.py
    targetLang_code = targetLang.upper()
    has_regional_variant = False
    if '-' in targetLang:
        targetLang_code = targetLang.split('-')[0].upper()
        has_regional_variant = True

    current_tag_handling = "plaintext" 
    postData = {
        "jsonrpc": "2.0",
        "method": "LMT_handle_jobs",
        "id": getRandomNumber(), # getRandomNumber for ID
        "id": getRandomNumber(),
        "params": {
            "commonJobParams": {
                "mode": "translate"
                "mode": "translate",
                "formality": "undefined",
                "transcribeAs": "romanize",
                "advancedMode": False,
                "textType": current_tag_handling,
                "wasSpoken": False,
            },
            "lang": {
                "source_lang_computed": sourceLang.upper(), # sourceLang to upper
                "target_lang": targetLang_code # targetLang_code (upper)
                "source_lang_user_selected": "auto",
                "target_lang": targetLang_code,
                "source_lang_computed": sourceLang.upper(),
            },
            "jobs": jobs,
            "priority": 1,
            "timestamp": getTimestamp(i_count) # timestamp
            "timestamp": getTimestamp(i_count)
        }
    }

    if has_regional_variant:
        postData["params"]["commonJobParams"]["regionalVariant"] = targetLang


    postDataStr = format_post_data(postData, getRandomNumber()) # format_post_data, getRandomNumber for ID
    url = f"{deeplAPI_base}?{deepl_client_params}&method=LMT_handle_jobs" # URL for LMT_handle_jobs
    translate_result_json = make_deepl_request(url, postDataStr, proxies) # make_deepl_request
    postDataStr = format_post_data(postData, getRandomNumber())
    LOGGER.debug(f"Request JSON before sending: {postDataStr}")
    url = f"{deeplAPI_base}?{deepl_client_params}&method=LMT_handle_jobs"
    translate_result_json = make_deepl_request(url, postDataStr, proxies)

    if 'error' in translate_result_json: # Error check from make_deepl_request
        return {"code": 503, "message": translate_result_json['error']} # Returns error if there is an error
    if 'error' in translate_result_json:
        return {"code": 503, "message": translate_result_json['error']}

    deeplx_result = deepl_response_to_deeplx(translate_result_json) # Transforms response using deepl_response_to_deeplx
    return deeplx_result # Returns result in DeepLX format
    deeplx_result = deepl_response_to_deeplx(translate_result_json)
    return deeplx_result


def translate(
    text,
    sourceLang=None,
    targetLang=None,
    numberAlternative=0, # numberAlternative is not used, same as in deepx.py
    numberAlternative=0,
    printResult=False,
    proxies=None,
):
    """Main translate function, calls core translation and handles output."""
    tagHandling = False # tagHandling default False, as in deepx.py (can be made a parameter if needed)
    tagHandling = "plaintext" # Явно задаем plaintext
    result_json = translate_core(text, sourceLang, targetLang, tagHandling, proxies=proxies)

    result_json = translate_core(text, sourceLang, targetLang, tagHandling, proxies=proxies) # Calls translate_core

    if result_json and result_json["code"] == 200: # Checks for code 200
    if result_json and result_json["code"] == 200:
        if printResult:
            print(result_json["data"]) # Prints main translation
        return result_json["data"] # Returns only main translation
            print(result_json["data"])
        return result_json["data"]
    else:
        error_message = result_json.get("message", "Unknown error") if result_json else "Request failed" # Error message
        LOGGER.error(f"Translation error: {error_message}") # Logs error
        raise Exception(f"Translation failed: {error_message}") # Raises exception
        error_message = result_json.get("message", "Unknown error") if result_json else "Request failed"
        LOGGER.error(f"Translation error: {error_message}")
        raise Exception(f"Translation failed: {error_message}")


@register_translator('DeepL Free')
class DeepLX(BaseTranslator):
    """DeepL Free Translator class, implements BaseTranslator interface."""
    cht_require_convert = True
    params: Dict = {
        'delay': 0.0,
        'proxy': { # Proxy parameter definition, similar to ocr_google_lens.py
        'proxy': {
            'value': '',
            'description': 'Proxy address (e.g., http(s)://user:password@host:port or socks4/5://user:password@host:port)'
        },
    }
    concate_text = True
    concate_text = False

    def _setup_translator(self):
        """Sets up language map for DeepL Free translator."""
        self.lang_map = { # lang_map including '繁體中文'
        self.lang_map = {
            '简体中文': 'zh',
            '日本語': 'ja',
            'English': 'en',
@@ -359,29 +347,30 @@ class DeepLX(BaseTranslator):
            'украї́нська мо́ва': 'uk',
            '한국어': 'ko',
            'Arabic': 'ar',
            '繁體中文': 'zh-TW', # Added '繁體中文' and language code 'zh-TW'
            '繁體中文': 'zh-TW',
        }
        self.textblk_break = '\n'

    def __init__(self, source='auto', target='en', raise_unsupported_lang=True, **params):
        """Initializes DeepLX translator, including proxy setup."""
        self.proxy = params.get('proxy', {}).get('value') # Get proxy URL string from params
        self.proxy = params.get('proxy', {}).get('value')
        super().__init__(source, target, raise_unsupported_lang=raise_unsupported_lang)


    def _translate(self, src_list: List[str]) -> List[str]:
        """Translates a list of strings using DeepL Free API."""
        result = []
        source = self.lang_map[self.lang_source]
        target = self.lang_map[self.lang_target]
        proxies = self.proxy # Get proxy from self.proxy for use in translate function

        for t in src_list:
            try: # try-except block for handling translation errors for individual text blocks
                tl = translate(t, source, target, proxies=proxies) # Pass proxy to translate function
                result.append(tl)
            except Exception as e: # Catches exceptions from translate function
                LOGGER.error(f"Translation failed for text: '{t}'. Error: {e}") # Logs error
                result.append(None) # Appends None in case of error
        proxies = self.proxy

        for text_block in src_list:
            translated_lines = []
            lines = text_block.split('\n')
            for line in lines:
                try:
                    tl = translate(line, source, target, proxies=proxies)
                    translated_lines.append(tl)
                except Exception as e:
                    LOGGER.error(f"Translation failed for line: '{line}'. Error: {e}")
                    translated_lines.append('')
            result.append('\n'.join(translated_lines))
        return result