Commit ae4161c8 authored by narugo1992's avatar narugo1992
Browse files

dev(narugo): add ccip and its benchmark

parent 632f2066
Loading
Loading
Loading
Loading
+58 −0
Original line number Diff line number Diff line
import random

from benchmark import BaseBenchmark, create_plot_cli
from imgutils.metrics.ccip import get_ccip_features, get_ccip_similarity, _VALID_MODEL_NAMES


class CCIPFeatureBenchmark(BaseBenchmark):
    def __init__(self, model_name):
        BaseBenchmark.__init__(self)
        self.model_name = model_name

    def load(self):
        from imgutils.metrics.ccip import _open_feat_model
        _ = _open_feat_model(self.model_name)

    def unload(self):
        from imgutils.metrics.ccip import _open_feat_model
        _open_feat_model.cache_clear()

    def run(self):
        image_file = random.choice(self.all_images)
        _ = get_ccip_features([image_file], model_name=self.model_name)


class CCIPDiffBenchmark(BaseBenchmark):
    def __init__(self, model_name):
        BaseBenchmark.__init__(self)
        self.model_name = model_name

    def prepare(self):
        self.feats = list(get_ccip_features(random.sample(self.all_images, k=30), model_name=self.model_name))

    def load(self):
        from imgutils.metrics.ccip import _open_metric_model
        _ = _open_metric_model(self.model_name)

    def unload(self):
        from imgutils.metrics.ccip import _open_metric_model
        _open_metric_model.cache_clear()

    def run(self):
        feat1 = random.choice(self.feats)
        feat2 = random.choice(self.feats)
        _ = get_ccip_similarity(feat1, feat2, model_name=self.model_name)


if __name__ == '__main__':
    bms = []
    for model_name in _VALID_MODEL_NAMES:
        bms.append((f'{model_name} extract', CCIPFeatureBenchmark(model_name)))
        bms.append((f'{model_name} metrics', CCIPDiffBenchmark(model_name)))

    create_plot_cli(
        bms,
        title='Benchmark for CCIP Models',
        run_times=10,
        try_times=20,
    )()
+90 −0
Original line number Diff line number Diff line
from functools import lru_cache
from typing import Union, List

import numpy as np
from PIL import Image
from huggingface_hub import hf_hub_download

from ..data import MultiImagesTyping, load_images, ImageTyping
from ..utils import open_onnx_model

__all__ = [
    'get_ccip_features',
    'get_ccip_similarity',
    'batch_ccip_similarity',
]


def _normalize(data, mean=(0.48145466, 0.4578275, 0.40821073), std=(0.26862954, 0.26130258, 0.27577711)):
    mean, std = np.asarray(mean), np.asarray(std)
    return (data - mean[:, None, None]) / std[:, None, None]


def _preprocess_image(image: Image.Image, size: int = 384):
    image = image.resize((size, size), resample=Image.BILINEAR)
    # noinspection PyTypeChecker
    data = np.array(image).transpose(2, 0, 1).astype(np.float32) / 255.0
    data = _normalize(data)

    return data


@lru_cache()
def _open_feat_model(model_name):
    return open_onnx_model(hf_hub_download(
        f'deepghs/imgutils-models',
        f'ccip/{model_name}_feat.onnx',
    ))


@lru_cache()
def _open_metric_model(model_name):
    return open_onnx_model(hf_hub_download(
        f'deepghs/imgutils-models',
        f'ccip/{model_name}_metrics.onnx',
    ))


_VALID_MODEL_NAMES = [
    'ccip-caformer-4_fp32',
    'ccip-caformer-2_fp32',
]
_DEFAULT_MODEL_NAMES = _VALID_MODEL_NAMES[0]


def get_ccip_features(images: MultiImagesTyping, size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES):
    images = load_images(images, mode='RGB')
    data = np.stack([_preprocess_image(item, size=size) for item in images]).astype(np.float32)
    output, = _open_feat_model(model_name).run(['output'], {'input': data})
    return output


def _preprocess_feats(x, size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES):
    if isinstance(x, np.ndarray):
        return x
    elif isinstance(x, (list, tuple)):
        feats = []
        for item in x:
            if isinstance(item, np.ndarray):
                feats.append(item)
            else:
                feats.append(get_ccip_features(load_images([item]), size, model_name)[0])

        return np.stack(feats)
    else:
        raise TypeError(f'Unknown feature batch type - {x!r}.')


_FeatureOrImage = Union[ImageTyping, np.ndarray]


def get_ccip_similarity(x: _FeatureOrImage, y: _FeatureOrImage,
                        size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES) -> float:
    return batch_ccip_similarity([x, y], size, model_name)[0, 1].item()


def batch_ccip_similarity(images: Union[np.ndarray, List[_FeatureOrImage]],
                          size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES):
    input_ = _preprocess_feats(images, size, model_name).astype(np.float32)
    output, = _open_metric_model(model_name).run(['output'], {'input': input_})
    return output