Commit 5e9716df authored by narugo1992's avatar narugo1992
Browse files

dev(narugo): basic complete this

parent cee698bb
Loading
Loading
Loading
Loading
+4 −4
Original line number Diff line number Diff line
import random

from benchmark import BaseBenchmark, create_plot_cli
from imgutils.metrics.ccip import batch_ccip_features, get_ccip_difference, _VALID_MODEL_NAMES
from imgutils.metrics.ccip import ccip_batch_extract_features, ccip_difference, _VALID_MODEL_NAMES


class CCIPFeatureBenchmark(BaseBenchmark):
@@ -19,7 +19,7 @@ class CCIPFeatureBenchmark(BaseBenchmark):

    def run(self):
        image_file = random.choice(self.all_images)
        _ = batch_ccip_features([image_file], model_name=self.model_name)
        _ = ccip_batch_extract_features([image_file], model_name=self.model_name)


class CCIPDiffBenchmark(BaseBenchmark):
@@ -28,7 +28,7 @@ class CCIPDiffBenchmark(BaseBenchmark):
        self.model_name = model_name

    def prepare(self):
        self.feats = list(batch_ccip_features(random.sample(self.all_images, k=30), model_name=self.model_name))
        self.feats = list(ccip_batch_extract_features(random.sample(self.all_images, k=30), model_name=self.model_name))

    def load(self):
        from imgutils.metrics.ccip import _open_metric_model
@@ -41,7 +41,7 @@ class CCIPDiffBenchmark(BaseBenchmark):
    def run(self):
        feat1 = random.choice(self.feats)
        feat2 = random.choice(self.feats)
        _ = get_ccip_difference(feat1, feat2, model_name=self.model_name)
        _ = ccip_difference(feat1, feat2, model_name=self.model_name)


if __name__ == '__main__':
+0 −2523

File deleted.

Preview size limit exceeded, changes collapsed.

+92 −28
Original line number Diff line number Diff line
import json
from functools import lru_cache
from typing import Union, List
from typing import Union, List, Optional, Tuple

import numpy as np
from PIL import Image
from huggingface_hub import hf_hub_download
from sklearn.cluster import DBSCAN, OPTICS
from tqdm.auto import tqdm

try:
    from typing import Literal
except (ModuleNotFoundError, ImportError):
    from typing_extensions import Literal

from ..data import MultiImagesTyping, load_images, ImageTyping
from ..utils import open_onnx_model

__all__ = [
    'get_ccip_feature',
    'batch_ccip_features',
    'get_ccip_difference',
    'batch_ccip_differences',
    'ccip_extract_feature',
    'ccip_batch_extract_features',

    'ccip_default_threshold',
    'ccip_difference',
    'ccip_same',
    'ccip_batch_differences',
    'ccip_batch_same',

    'ccip_default_clustering_params',
    'ccip_clustering',
]


@@ -59,46 +73,96 @@ def _open_cluster_metrics(model_name):
        return json.load(f)


_VALID_MODEL_NAMES = [
    'ccip-caformer-24-randaug-pruned',
    'ccip-caformer-6-randaug-pruned_fp32',
    'ccip-caformer-5_fp32',
]
_DEFAULT_MODEL_NAMES = 'ccip-caformer-24-randaug-pruned'


def get_ccip_feature(image: ImageTyping, size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES):
    return batch_ccip_features([image], size, model_name)[0]
def ccip_extract_feature(image: ImageTyping, size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES):
    return ccip_batch_extract_features([image], size, model_name)[0]


def batch_ccip_features(images: MultiImagesTyping, size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES):
def ccip_batch_extract_features(images: MultiImagesTyping, size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES):
    images = load_images(images, mode='RGB')
    data = np.stack([_preprocess_image(item, size=size) for item in images]).astype(np.float32)
    output, = _open_feat_model(model_name).run(['output'], {'input': data})
    return output


def _preprocess_feats(x, size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES):
    if isinstance(x, np.ndarray):
_FeatureOrImage = Union[ImageTyping, np.ndarray]


def _p_feature(x: _FeatureOrImage, size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES):
    if isinstance(x, np.ndarray):  # if feature
        return x
    elif isinstance(x, (list, tuple)):
        feats = []
        for item in x:
            if isinstance(item, np.ndarray):
                feats.append(item)
            else:
                feats.append(batch_ccip_features(load_images([item]), size, model_name)[0])
    else:  # is image or path
        return ccip_extract_feature(x, size, model_name)

        return np.stack(feats)
    else:
        raise TypeError(f'Unknown feature batch type - {x!r}.')

def ccip_default_threshold(model_name: str = _DEFAULT_MODEL_NAMES) -> float:
    return _open_metrics(model_name)['threshold']

_FeatureOrImage = Union[ImageTyping, np.ndarray]

def ccip_difference(x: _FeatureOrImage, y: _FeatureOrImage,
                    size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES) -> float:
    return ccip_batch_differences([x, y], size, model_name)[0, 1].item()


def get_ccip_difference(x: _FeatureOrImage, y: _FeatureOrImage,
def ccip_same(x: _FeatureOrImage, y: _FeatureOrImage, threshold: Optional[float] = None,
              size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES) -> float:
    return batch_ccip_differences([x, y], size, model_name)[0, 1].item()
    diff = ccip_difference(x, y, size, model_name)
    threshold = threshold if threshold is not None else ccip_default_threshold(model_name)
    return diff <= threshold


def batch_ccip_differences(images: Union[np.ndarray, List[_FeatureOrImage]],
                           size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES):
    input_ = _preprocess_feats(images, size, model_name).astype(np.float32)
def ccip_batch_differences(images: List[_FeatureOrImage],
                           size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES) -> np.ndarray:
    input_ = np.stack([_p_feature(img, size, model_name) for img in images]).astype(np.float32)
    output, = _open_metric_model(model_name).run(['output'], {'input': input_})
    return output


def ccip_batch_same(images: List[_FeatureOrImage], threshold: Optional[float] = None,
                    size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES) -> np.ndarray:
    batch_diff = ccip_batch_differences(images, size, model_name)
    threshold = threshold if threshold is not None else ccip_default_threshold(model_name)
    return batch_diff <= threshold


CCIPClusterModeTyping = Literal['dbscane', 'dbscan_2', 'dbscan_free', 'optics']


def ccip_default_clustering_params(model_name: str = _DEFAULT_MODEL_NAMES,
                                   mode: CCIPClusterModeTyping = 'dbscan') -> Tuple[float, int]:
    if mode == 'dbscan':
        return ccip_default_threshold(model_name), 2
    else:
        _info = _open_cluster_metrics(model_name)[mode]
        return _info['eps'], _info['min_samples']


def ccip_clustering(images: List[_FeatureOrImage], mode: CCIPClusterModeTyping = 'dbscan',
                    eps: Optional[float] = None, min_samples: Optional[int] = None,
                    size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES) -> np.ndarray:
    _default_eps, _default_min_samples = ccip_default_clustering_params(model_name, mode)
    eps = eps or _default_eps
    min_samples = min_samples or _default_min_samples

    images = [_p_feature(img, size, model_name) for img in tqdm(images, desc='Extract features')]
    batch_diff = ccip_batch_differences(images, size, model_name)

    def _metric(x, y):
        return batch_diff[int(x), int(y)].item()

    samples = np.arange(len(images)).reshape(-1, 1)
    if 'dbscan' in mode:
        clustering = DBSCAN(eps=eps, min_samples=min_samples, metric=_metric).fit(samples)
    elif mode == 'optics':
        clustering = OPTICS(max_eps=eps, min_samples=min_samples, metric=_metric).fit(samples)
    else:
        raise ValueError(f'Unknown mode for CCIP clustering - {mode!r}.')

    return clustering.labels_.tolist()
+12 −12
Original line number Diff line number Diff line
@@ -208,18 +208,18 @@ def export_model_to_dir(file_in_repo: str, output_dir: str, repository: str = 'd
    with open(metrics_file, 'w') as f:
        json.dump(metrics, fp=f, indent=4, sort_keys=True, ensure_ascii=False)

    # clustering_file = os.path.join(output_dir, 'cluster.json')
    # logging.info(f'Creating clustering measurement {clustering_file!r} ...')
    # c_results = {}
    # for cname, method, xrange in [
    #     ('dbscan_free', 'dbscan', (2, 5)),
    #     ('dbscan_2', 'dbscan', (2, 2)),
    #     ('optics', 'optics', (2, 5)),
    # ]:
    #     params, score = clustering_metrics(dist, cids, method=method, min_samples_range=xrange)
    #     c_results[cname] = {**params, 'score': score}
    # with open(clustering_file, 'w') as f:
    #     json.dump(c_results, fp=f, indent=4, sort_keys=True, ensure_ascii=False)
    clustering_file = os.path.join(output_dir, 'cluster.json')
    logging.info(f'Creating clustering measurement {clustering_file!r} ...')
    c_results = {}
    for cname, method, xrange in [
        ('dbscan_free', 'dbscan', (2, 5)),
        ('dbscan_2', 'dbscan', (2, 2)),
        ('optics', 'optics', (2, 5)),
    ]:
        params, score = clustering_metrics(dist, cids, method=method, min_samples_range=xrange)
        c_results[cname] = {**params, 'score': score}
    with open(clustering_file, 'w') as f:
        json.dump(c_results, fp=f, indent=4, sort_keys=True, ensure_ascii=False)

    for name, img in plots.items():
        plt_file = os.path.join(output_dir, f'plt_{name}.png')