Unverified Commit f162d5a4 authored by dmMaze's avatar dmMaze Committed by GitHub
Browse files

Merge pull request #632 from bropines/paddle_ocr

Add Paddle ocr and EasyOCR
parents a4c50344 3d8edf32
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -216,6 +216,7 @@ This project is heavily dependent upon [manga-image-translator](https://github.c
   * The current implementation uses OCR on each textblock individually, resulting in slower speed and no significant improvement in accuracy. It is not recommended. If needed, please use the Tuanzi Detector instead.
   * When using the Tuanzi Detector for text detection, it is recommended to set OCR to none_ocr to directly read the text, saving time and reducing the number of requests.
   * For detailed instructions, see **Tuanzi OCR Instructions**: ([Chinese](doc/团子OCR说明.md) & [Brazilian Portuguese](doc/Manual_TuanziOCR_pt-BR.md) only)
* Added as an "optional" PaddleOCR module. In Debug mode you will see a message stating that it is not there. You can simply install it by following the instructions described there. If you don’t want to install the package yourself, just uncomment (remove the `#`) the lines with paddlepaddle(gpu) and paddleocr. Bet everything at your own peril andrisk. For me (bropines) and two testers, everything was installed fine, you may have an error. Write about it in issue and tag me.

## Inpainting
  * AOT is from [manga-image-translator](https://github.com/zyddnys/manga-image-translator).
+3 −0
Original line number Diff line number Diff line
@@ -5,6 +5,9 @@ cd %~dp0

@echo off

:: Set the path for PaddleOCR and PyTorch libraries
set "PADDLE_PATH=%~dp0ballontrans_pylibs_win\Lib\site-packages\torch\lib"
set "PATH=%PADDLE_PATH%;%PATH%"

@REM if not defined PYTHON (set PATH=pylibs;pylibs\Scripts;%%PATH%%
set PATH=ballontrans_pylibs_win;ballontrans_pylibs_win\Scripts;PortableGit\cmd;%PATH%
+36 −12
Original line number Diff line number Diff line
@@ -191,6 +191,16 @@ class LensAPI:
        else:
            raise ValueError("Invalid response method")

def format_ocr_result(result):
    formatted_result = {
        "language": result.get("language", ""),
        "text_with_coordinates": [
            f"{item['text']}: {item['coordinates']}"
            for item in result.get("text_with_coordinates", [])
        ]
    }
    return json5.dumps(formatted_result, indent=4, ensure_ascii=False)

@register_OCR('google_lens')
class OCRLensAPI(OCRBase):
    params = {
@@ -228,7 +238,10 @@ class OCRLensAPI(OCRBase):
    
    @property
    def request_delay(self):
        return self.get_param_value('delay')
        try:
            return float(self.get_param_value('delay'))
        except (ValueError, TypeError):
            return 1.0 

    @property
    def newline_handling(self):
@@ -247,6 +260,11 @@ class OCRLensAPI(OCRBase):
        return self.get_param_value('proxy')

    def __init__(self, **params) -> None:
        if 'delay' in params:
            try:
                params['delay'] = float(params['delay'])
            except (ValueError, TypeError):
                params['delay'] = 1.0  # Значение по умолчанию
        super().__init__(**params)
        self.api = LensAPI(proxy=self.proxy)
        self.last_request_time = 0
@@ -254,15 +272,15 @@ class OCRLensAPI(OCRBase):
    def _ocr_blk_list(self, img: np.ndarray, blk_list: List[TextBlock], *args, **kwargs):
        im_h, im_w = img.shape[:2]
        if self.debug_mode:
            self.logger.info(f'Image size: {im_h}x{im_w}')
            self.logger.debug(f'Image size: {im_h}x{im_w}')
        for blk in blk_list:
            x1, y1, x2, y2 = blk.xyxy
            if self.debug_mode:
                self.logger.info(f'Processing block: ({x1, y1, x2, y2})')
                self.logger.debug(f'Processing block: ({x1, y1, x2, y2})')
            if y2 < im_h and x2 < im_w and x1 > 0 and y1 > 0 and x1 < x2 and y1 < y2:
                cropped_img = img[y1:y2, x1:x2]
                if self.debug_mode:
                    self.logger.info(f'Cropped image size: {cropped_img.shape}')
                    self.logger.debug(f'Cropped image size: {cropped_img.shape}')
                blk.text = self.ocr(cropped_img)
            else:
                if self.debug_mode:
@@ -276,16 +294,17 @@ class OCRLensAPI(OCRBase):
    
    def ocr(self, img: np.ndarray) -> str:
        if self.debug_mode:
            self.logger.info(f'Starting OCR on image of shape: {img.shape}')
            self.logger.debug(f'Starting OCR on image of shape: {img.shape}')
        self._respect_delay()
        try:
            if img.size > 0:  # Check if the image is not empty
                if self.debug_mode:
                    self.logger.info(f'Input image size: {img.shape}')
                    self.logger.debug(f'Input image size: {img.shape}')
                _, buffer = cv2.imencode('.jpg', img)
                result = self.api.process_image(image_buffer=buffer.tobytes(), response_method=self.response_method)
                if self.debug_mode:
                    self.logger.info(f'OCR result: {result}')
                    formatted_result = format_ocr_result(result)
                    self.logger.debug(f'OCR result: {formatted_result}')
                ignore_texts = [
                    'Full text not found in expected structure',
                    'Full text not found (or Lens could not recognize it)'
@@ -347,8 +366,13 @@ class OCRLensAPI(OCRBase):
        self.last_request_time = time.time()

    def updateParam(self, param_key: str, param_content):
        if param_key == 'delay':
            try:
                param_content = float(param_content)
            except (ValueError, TypeError):
                param_content = 1.0 # Default value
        super().updateParam(param_key, param_content)
        if param_key == 'proxy':
            # При изменении прокси пересоздаем клиента
            self.api.lens.proxy = self.proxy  # Обновляем прокси
            self.api.lens.client = None  # Обнуляем клиента, чтобы создать его при следующем запросе
 No newline at end of file
            # When changing the proxy, recreate the client
            self.api.lens.proxy = self.proxy # Update the proxy
            self.api.lens.client = None # Reset the client to create a new one on the next request
+341 −0
Original line number Diff line number Diff line
import numpy as np
from typing import List
import os
import logging

try:
    from paddleocr import PaddleOCR
    PADDLE_OCR_AVAILABLE = True
except ImportError:
    PADDLE_OCR_AVAILABLE = False
    logging.warning(
        'PaddleOCR is not installed, so the module will not be initialized. \nInstall core it by following https://www.paddlepaddle.org.cn/en/install/quick?docurl \nand then run `pip install paddleocr`'
    )

import cv2
import re

from .base import OCRBase, register_OCR, DEFAULT_DEVICE, DEVICE_SELECTOR, TextBlock

# Specify the path for storing PaddleOCR models
PADDLE_OCR_PATH = os.path.join('data', 'models', 'paddle-ocr')
# Set an environment variable to store PaddleOCR models
os.environ['PPOCR_HOME'] = PADDLE_OCR_PATH

if PADDLE_OCR_AVAILABLE:
    @register_OCR('paddle_ocr')
    class PaddleOCRModule(OCRBase):
        # Mapping language names to PaddleOCR codes
        lang_map = {
            'Chinese & English': 'ch',
            'English': 'en',
            'French': 'fr',
            'German': 'german',
            'Japanese': 'japan',
            'Korean': 'korean',
            'Chinese Traditional': 'chinese_cht',
            'Italian': 'it',
            'Spanish': 'es',
            'Portuguese': 'pt',
            'Russian': 'ru',
            'Ukrainian': 'uk',
            'Belarusian': 'be',
            'Telugu': 'te',
            'Saudi Arabia': 'sa',
            'Tamil': 'ta',
            'Afrikaans': 'af',
            'Azerbaijani': 'az',
            'Bosnian': 'bs',
            'Czech': 'cs',
            'Welsh': 'cy',
            'Danish': 'da',
            'Dutch': 'nl',
            'Norwegian': 'no',
            'Polish': 'pl',
            'Romanian': 'ro',
            'Slovak': 'sk',
            'Slovenian': 'sl',
            'Albanian': 'sq',
            'Swedish': 'sv',
            'Swahili': 'sw',
            'Tagalog': 'tl',
            'Turkish': 'tr',
            'Uzbek': 'uz',
            'Vietnamese': 'vi',
            'Mongolian': 'mn',
            'Arabic': 'ar',
            'Hindi': 'hi',
            'Uyghur': 'ug',
            'Persian': 'fa',
            'Urdu': 'ur',
            'Serbian (Latin)': 'rs_latin',
            'Occitan': 'oc',
            'Marathi': 'mr',
            'Nepali': 'ne',
            'Serbian (Cyrillic)': 'rs_cyrillic',
            'Bulgarian': 'bg',
            'Estonian': 'et',
            'Irish': 'ga',
            'Croatian': 'hr',
            'Hungarian': 'hu',
            'Indonesian': 'id',
            'Icelandic': 'is',
            'Kurdish': 'ku',
            'Lithuanian': 'lt',
            'Latvian': 'lv',
            'Maori': 'mi',
            'Malay': 'ms',
            'Maltese': 'mt',
            'Adyghe': 'ady',
            'Kabardian': 'kbd',
            'Avar': 'ava',
            'Dargwa': 'dar',
            'Ingush': 'inh',
            'Lak': 'lbe',
            'Lezghian': 'lez',
            'Tabassaran': 'tab',
            'Bihari': 'bh',
            'Maithili': 'mai',
            'Angika': 'ang',
            'Bhojpuri': 'bho',
            'Magahi': 'mah',
            'Nagpur': 'sck',
            'Newari': 'new',
            'Goan Konkani': 'gom',
        }

        params = {
            'language': {
                'type': 'selector',
                'options': list(lang_map.keys()),
                'value': 'English',  # Default language
                'description': 'Select the language for OCR',
            },
            'device': DEVICE_SELECTOR(),
            'use_angle_cls': {
                'type': 'checkbox',
                'value': False,
                'description': 'Enable angle classification for rotated text',
            },
            'ocr_version': {
                'type': 'selector',
                'options': ['PP-OCRv4', 'PP-OCRv3', 'PP-OCRv2', 'PP-OCR'],
                'value': 'PP-OCRv4',
                'description': 'Select the OCR model version',
            },
            'enable_mkldnn': {
                'type': 'checkbox',
                'value': False,
                'description': 'Enable MKL-DNN for CPU acceleration',
            },
            'det_limit_side_len': {
                'value': 960,
                'description': 'Maximum side length for text detection',
            },
            'rec_batch_num': {
                'value': 6,
                'description': 'Batch size for text recognition',
            },
            'drop_score': {
                'value': 0.5,
                'description': 'Confidence threshold for text recognition',
            },
            'text_case': {
                'type': 'selector',
                'options': ['Uppercase', 'Capitalize Sentences', 'Lowercase'],
                'value': 'Capitalize Sentences',
                'description': 'Text case transformation',
            },
            'output_format': {
                'type': 'selector',
                'options': ['Single Line', 'As Recognized'],
                'value': 'As Recognized',
                'description': 'Text output format',
            },
        }

        device = DEFAULT_DEVICE

        def __init__(self, **params) -> None:
            super().__init__(**params)
            self.language = self.params['language']['value']
            self.device = self.params['device']['value']
            self.use_angle_cls = self.params['use_angle_cls']['value']
            self.ocr_version = self.params['ocr_version']['value']
            self.enable_mkldnn = self.params['enable_mkldnn']['value']
            self.det_limit_side_len = self.params['det_limit_side_len']['value']
            self.rec_batch_num = self.params['rec_batch_num']['value']
            self.drop_score = self.params['drop_score']['value']
            self.text_case = self.params['text_case']['value']
            self.output_format = self.params['output_format']['value']
            self.model = None
            self._setup_logging()
            self._load_model()

        def _setup_logging(self):
            if self.debug_mode:
                logging.getLogger('ppocr').setLevel(logging.DEBUG)
                logging.getLogger('paddleocr').setLevel(logging.DEBUG)
                logging.getLogger('predict_system').setLevel(logging.DEBUG)
            else:
                logging.getLogger('ppocr').setLevel(logging.WARNING)
                logging.getLogger('paddleocr').setLevel(logging.WARNING)
                logging.getLogger('predict_system').setLevel(logging.WARNING)

        def _load_model(self):
            lang_code = self.lang_map[self.language]
            use_gpu = True if self.device == 'cuda' else False
            if self.debug_mode:
                self.logger.info(f"Loading PaddleOCR model for language: {self.language} ({lang_code}), GPU: {use_gpu}")
            self.model = PaddleOCR(
                use_angle_cls=self.use_angle_cls,
                lang=lang_code,
                use_gpu=use_gpu,
                ocr_version=self.ocr_version,
                enable_mkldnn=self.enable_mkldnn,
                det_limit_side_len=self.det_limit_side_len,
                rec_batch_num=self.rec_batch_num,
                drop_score=self.drop_score,
                det_model_dir=os.path.join(PADDLE_OCR_PATH, lang_code, self.ocr_version, 'det'),
                rec_model_dir=os.path.join(PADDLE_OCR_PATH, lang_code, self.ocr_version, 'rec'),
                cls_model_dir=os.path.join(PADDLE_OCR_PATH, lang_code, self.ocr_version, 'cls') if self.use_angle_cls else None,
            )

        def ocr_img(self, img: np.ndarray) -> str:
            if self.debug_mode:
                self.logger.debug(f"Starting OCR for image size: {img.shape}")
            result = self.model.ocr(img, det=True, rec=True, cls=self.use_angle_cls)
            if self.debug_mode:
                self.logger.debug(f"OCR recognition result: {result}")
            text = self._process_result(result)
            return text

        def _ocr_blk_list(self, img: np.ndarray, blk_list: List[TextBlock], *args, **kwargs):
            im_h, im_w = img.shape[:2]
            for blk in blk_list:
                x1, y1, x2, y2 = blk.xyxy
                if 0 <= x1 < x2 <= im_w and 0 <= y1 < y2 <= im_h:
                    cropped_img = img[y1:y2, x1:x2]
                    try:
                        result = self.model.ocr(cropped_img, det=True, rec=True, cls=self.use_angle_cls)
                        
                        # Extract raw text from OCR result
                        raw_texts = []
                        if isinstance(result, list) and len(result) > 0 and isinstance(result[0], list):
                            for line in result[0]:
                                if isinstance(line, list) and len(line) > 1 and isinstance(line[1], (list, tuple)) and len(line[1]) > 0:
                                    raw_texts.append(line[1][0])
                        raw_text = ' '.join(raw_texts)
                        
                        if self.debug_mode:
                            self.logger.debug(f"Raw OCR text from block ({x1}, {y1}, {x2}, {y2}): {raw_text}")
                        
                        # Process the OCR result
                        text = self._process_result(result)
                        
                        if self.debug_mode:
                            self.logger.debug(f"Processed text from block ({x1}, {y1}, {x2}, {y2}): {text}")
                        
                        blk.text = text if text else ''
                        
                    except Exception as e:
                        if self.debug_mode:
                            self.logger.error(f"Error recognizing block: {str(e)}")
                        blk.text = ''
                else:
                    if self.debug_mode:
                        self.logger.warning('Invalid text block coordinates for target image')
                    blk.text = ''

        def _process_result(self, result):
            try:
                if not result or result[0] is None:
                    return ''

                if isinstance(result, list) and len(result) > 0 and isinstance(result[0], list):
                    result = result[0]

                raw_texts = []
                for line in result:
                    if isinstance(line, list) and len(line) > 1 and isinstance(line[1], (list, tuple)) and len(line[1]) > 0:
                        text = line[1][0]
                        raw_texts.append(text)

                # Depending on the output_format, we concatenate the lines
                if self.output_format == 'Single Line':
                    joined_text = ' '.join(raw_texts)
                    # Text cleaning
                    joined_text = re.sub(r'-(?!\w)', '', joined_text)
                    joined_text = re.sub(r'\s+', ' ', joined_text)
                elif self.output_format == 'As Recognized':
                    joined_text = ' '.join(raw_texts)  # Combine with spaces to create a single text
                    # Clean up text, preserve line breaks
                    joined_text = re.sub(r'-(?!\w)', '', joined_text)
                    joined_text = re.sub(r'\s+', ' ', joined_text)
                else:
                    joined_text = ' '.join(raw_texts)
                    joined_text = re.sub(r'-(?!\w)', '', joined_text)
                    joined_text = re.sub(r'\s+', ' ', joined_text)

                # Apply case conversion to all text
                processed_text = self._apply_text_case(joined_text)
                processed_text = self._apply_punctuation_and_spacing(processed_text)

                if self.debug_mode:
                    self.logger.debug(f"Final processed text: {processed_text}")

                return processed_text
            except Exception as e:
                if self.debug_mode:
                    self.logger.error(f"Error processing OCR result: {str(e)}")
                return ''

        def _apply_text_case(self, text: str) -> str:
            if self.text_case == 'Uppercase':
                return text.upper()
            elif self.text_case == 'Capitalize Sentences':
                return self._capitalize_sentences(text)
            elif self.text_case == 'Lowercase':
                return text.lower()
            else:
                return text  # No change if the mode is not recognized

        def _capitalize_sentences(self, text: str) -> str:
            def process_sentence(sentence):
                words = sentence.split()
                if not words:
                    return ''
                if len(words) == 1:
                    return words[0].capitalize()
                else:
                    return ' '.join([words[0].capitalize()] + [word.lower() for word in words[1:]])

            # We divide into sentences only by punctuation marks
            sentences = re.split(r'(?<=[.!?…])\s+', text)
            return ' '.join(process_sentence(sentence) for sentence in sentences)

        def _apply_punctuation_and_spacing(self, text: str) -> str:
            text = re.sub(r'\s+([,.!?…])', r'\1', text)
            text = re.sub(r'([,.!?…])(?!\s)(?![,.!?…])', r'\1 ', text)
            text = re.sub(r'([,.!?…])\s+([,.!?…])', r'\1\2', text)
            return text.strip()

        def updateParam(self, param_key: str, param_content):
            super().updateParam(param_key, param_content)
            if param_key in ['language', 'device', 'use_angle_cls', 'ocr_version', 'enable_mkldnn', 'det_limit_side_len', 'rec_batch_num', 'drop_score']:
                self.language = self.params['language']['value']
                self.device = self.params['device']['value']
                self.use_angle_cls = self.params['use_angle_cls']['value']
                self.ocr_version = self.params['ocr_version']['value']
                self.enable_mkldnn = self.params['enable_mkldnn']['value']
                self.det_limit_side_len = self.params['det_limit_side_len']['value']
                self.rec_batch_num = self.params['rec_batch_num']['value']
                self.drop_score = self.params['drop_score']['value']
                self._load_model()
            elif param_key == 'text_case':
                self.text_case = self.params['text_case']['value']
            elif param_key == 'output_format':
                self.output_format = self.params['output_format']['value']
else:
    # If PaddleOCR is not installed, you can define a stub or alternative module
    logging.info('PaddleOCR module will not be loaded as the library is not installed.')
+4 −0
Original line number Diff line number Diff line
@@ -45,6 +45,7 @@ multivolumefile
httpx[socks,brotli]
langdetect
translators
paddleocr
json5
httpx-socks
pywin32; sys_platform == 'win32'
@@ -55,3 +56,6 @@ pyobjc-framework-cocoa; sys_platform == 'darwin'
pyobjc-framework-coreml; sys_platform == 'darwin'
pyobjc-framework-quartz; sys_platform == 'darwin'
pyobjc-framework-vision; sys_platform == 'darwin'
#paddleocr
#paddlepaddle - one of these. If you have a CUDA video card, then uncomment paddlepaddle-gpu
#paddlepaddle-gpu
 No newline at end of file