Loading Makefile +4 −4 Original line number Diff line number Diff line Loading @@ -55,8 +55,8 @@ dataset: if [ ! -d ${DATASET_DIR}/monochrome_danbooru ]; then \ git clone https://${HF_NARUGO_USERNAME}:${HF_NARUGO_PASSWORD}@huggingface.co/datasets/deepghs/monochrome_danbooru.git ${DATASET_DIR}/monochrome_danbooru; \ fi if [ ! -d ${DATASET_DIR}/images_xtiny_v0 ]; then \ mkdir -p ${DATASET_DIR}/images_xtiny_v0 && \ wget -O ${DATASET_DIR}/images_xtiny_v0/images_xtiny_v0.tar.xz https://huggingface.co/datasets/deepghs/character_similarity/resolve/main/images_xtiny_v0.tar.xz && \ cd ${DATASET_DIR}/images_xtiny_v0 && tar -xvf images_xtiny_v0.tar.xz && rm -rf *.tar.xz; \ if [ ! -d ${DATASET_DIR}/images_test_v1 ]; then \ mkdir -p ${DATASET_DIR}/images_test_v1 && \ curl -L -o ${DATASET_DIR}/images_test_v1/images_test_v1.tar.xz https://huggingface.co/datasets/deepghs/character_similarity/resolve/main/images_test_v1.tar.xz && \ cd ${DATASET_DIR}/images_test_v1 && tar -xvf images_test_v1.tar.xz && rm -rf *.tar.xz; \ fi zoo/ccip/__main__.py +27 −138 Original line number Diff line number Diff line import glob import os.path import random import tempfile from functools import partial from typing import Optional, Tuple from typing import Optional import click import torch from ditk import logging from hbutils.testing import disable_output from huggingface_hub import hf_hub_download from sklearn import svm from sklearn.metrics import accuracy_score from torchvision import transforms from tqdm.auto import tqdm from imgutils.data import load_image from .dataset import TEST_TRANSFORM from .demo import _get_model_from_ckpt from .model import CCIP from .onnx import export_feat_model_to_onnx, export_metrics_model_to_onnx from .onnx import export_feat_model_to_onnx, export_metrics_model_to_onnx, get_scale_for_model, \ export_full_model_to_onnx from ..utils import GLOBAL_CONTEXT_SETTINGS from ..utils import print_version as _origin_print_version Loading @@ -35,95 +27,12 @@ def cli(): _CHECK_ITEMS = { 'full': export_full_model_to_onnx, 'feat': export_feat_model_to_onnx, 'metrics': export_metrics_model_to_onnx, } def _sample_analysis(poss, negs, svm_samples: int = 10000): poss_cnt, negs_cnt = poss.shape[0], negs.shape[0] total = poss_cnt + negs_cnt if total > svm_samples: s_poss = poss[random.sample(range(poss_cnt), k=int(round(poss_cnt * svm_samples / total)))] s_negs = negs[random.sample(range(negs_cnt), k=int(round(negs_cnt * svm_samples / total)))] else: s_poss, s_negs = poss, negs s_poss, s_negs = s_poss.cpu(), s_negs.cpu() features = torch.cat([s_poss, s_negs]).detach().numpy() labels = torch.cat([torch.ones_like(s_poss), -torch.ones_like(s_negs)]).detach().numpy() model = svm.SVC(kernel='linear') # 线性核 model.fit(features.reshape(-1, 1), labels) predictions = model.predict(features.reshape(-1, 1)) coef = model.coef_.reshape(-1)[0].tolist() inter = model.intercept_.reshape(-1)[0].tolist() threshold = -inter / coef return poss.mean().item(), poss.std().item(), \ negs.mean().item(), negs.std().item(), \ threshold, accuracy_score(labels, predictions) def _sample_safe_threshold(poss, negs, precision: float = 0.98) -> Tuple[float, float]: items = sorted([ *((v, 1) for v in poss), *((v, 0) for v in negs), ], key=lambda x: (-x[0], -x[1])) pos_cnt, neg_cnt = 0, 0 r_threshold, r_precision = None, None for i, (v, label) in enumerate(items): if label == 0: neg_cnt += 1 else: pos_cnt += 1 current_precision = pos_cnt / (pos_cnt + neg_cnt) if r_threshold is None or current_precision >= precision or current_precision > r_precision: if i == len(items) - 1: r_threshold = v else: v_next, _ = items[i + 1] r_threshold = (v + v_next) / 2 r_precision = current_precision return r_threshold, r_precision @torch.no_grad() def get_threshold_for_model(model: CCIP, preprocess, samples: int = 200, safe_precision: float = 0.98) \ -> Tuple[float, float, float, float]: def _get_sim(x, y): x, y = load_image(x, mode='RGB'), load_image(y, mode='RGB') input_ = torch.stack([preprocess(x), preprocess(y)]) return model(input_)[0][1] dataset_dir = 'test/testfile/dataset/images_xtiny_v0/' all_images = glob.glob(os.path.join(dataset_dir, '*', '*', '*.jpg')) all_chs = sorted(set([os.path.dirname(img) for img in all_images])) not_same_samples = [] for _ in tqdm(range(samples)): x_ch, y_ch = random.sample(all_chs, k=2) x_img = random.choice(glob.glob(os.path.join(x_ch, '*.jpg'))) y_img = random.choice(glob.glob(os.path.join(y_ch, '*.jpg'))) not_same_samples.append(_get_sim(x_img, y_img)) not_same_samples = torch.as_tensor(not_same_samples) same_samples = [] for _ in tqdm(range(samples)): ch = random.choice(all_chs) x_img, y_img = random.sample(glob.glob(os.path.join(ch, '*.jpg')), k=2) same_samples.append(_get_sim(x_img, y_img)) same_samples = torch.as_tensor(same_samples) _, _, _, _, threshold, accuracy = _sample_analysis(same_samples, not_same_samples, svm_samples=samples) safe_threshold, safe_prec = _sample_safe_threshold(same_samples, not_same_samples, precision=safe_precision) return threshold, accuracy, safe_threshold, safe_prec @cli.command('onnx_check', help='Check onnx export is okay or not') @click.option('--model', '-m', 'model', type=str, required=True, help='Model to be checked. ', show_default=True) Loading @@ -131,40 +40,28 @@ def get_threshold_for_model(model: CCIP, preprocess, samples: int = 200, safe_pr help='Show verbose information.', show_default=True) @click.option('--output_dir', '-O', 'output_dir', type=click.Path(file_okay=False), default=None, help='Output directory of all models.', show_default=True) @click.option('--threshold_samples', '-T', 'threshold_samples', type=int, default=500, help='Batch of samples to find threshold.', show_default=True) def onnx_check(model: str, verbose: bool = False, output_dir: Optional[str] = None, threshold_samples: int = 500): def onnx_check(model: str, verbose: bool = False, output_dir: Optional[str] = None): logging.try_init_root(logging.INFO) model, model_name = CCIP(model), model model.eval() logging.info('Finding threshold ...') threshold_mean, accuracy_mean, threshold_safe, precision_safe = get_threshold_for_model( model, transforms.Compose(TEST_TRANSFORM + model.preprocess), samples=threshold_samples, ) logging.info(f'Threshold: {threshold_mean:.4f}, accuracy: {accuracy_mean * 100.0:.2f}%') logging.info(f'Safe threshold: {threshold_safe:.4f}, accuracy: {precision_safe * 100.0:.2f}%') scale = get_scale_for_model(model) logging.info(f'Scale: {scale:.4f}') with tempfile.TemporaryDirectory() as td: for item, safe, threshold in [ ('feat', False, threshold_mean), ('metrics', False, threshold_mean), ('metrics', True, threshold_safe), ]: click.echo(click.style(f'Try exporting {model_name}(safe={safe!r})-->{item} to onnx ... '), nl=False) onnx_filename = os.path.join(output_dir or td, f'{model_name}_{"safe_" if safe else ""}{item}.onnx') for item in ['feat', 'metrics']: click.echo(click.style(f'Try exporting {model_name}-->{item} to onnx ... '), nl=False) onnx_filename = os.path.join(output_dir or td, f'{model_name}_{item}.onnx') export_func = _CHECK_ITEMS[item] try: model = CCIP(model_name) # necessary if verbose: export_func(model, threshold, onnx_filename, verbose=verbose) export_func(model, scale, onnx_filename, verbose=verbose) else: with disable_output(): export_func(model, threshold, onnx_filename, verbose=verbose) export_func(model, scale, onnx_filename, verbose=verbose) except: click.echo(click.style('FAILED', fg='red'), nl=True) raise Loading @@ -176,50 +73,42 @@ MODELS = [ # ('caformer', 'ccip-caformer-2_fp32.ckpt'), # ('caformer', 'ccip-caformer-4_fp32.ckpt'), # ('caformer', 'ccip-caformer-5_fp32.ckpt'), ('caformer', 'ccip-caformer-23_randaug_fp32.ckpt'), # ('caformer', 'ccip-caformer-23_randaug_fp32.ckpt'), ('caformer', 'ccip-caformer-24-randaug-pruned.ckpt'), ('caformer_query', 'ccip-caformer_query-12.ckpt'), ] @cli.command('export', help='Export all models as onnx.', context_settings={**GLOBAL_CONTEXT_SETTINGS}) @click.option('--repository', '-r', 'repository', type=str, default='deepghs/ccip', help='Source repository.', show_default=True) @click.option('--output_dir', '-O', 'output_dir', type=click.Path(file_okay=False), required=True, help='Output directory of all models.', show_default=True) @click.option('--verbose', '-V', 'verbose', is_flag=True, type=bool, default=False, help='Show verbose information.', show_default=True) @click.option('--threshold_samples', '-T', 'threshold_samples', type=int, default=500, help='Batch of samples to find threshold.', show_default=True) def export(output_dir: str, verbose: bool = False, threshold_samples: int = 500): def export(repository: str, output_dir: str, verbose: bool = False): for model_name, ckpt_name in MODELS: ckpt_file = hf_hub_download('deepghs/ccip', ckpt_name, repo_type='model') ckpt_file = hf_hub_download(repository, ckpt_name, repo_type='model') model, preprocess = _get_model_from_ckpt(model_name, ckpt_file, device='cpu', fp16=False) ckpt_body, _ = os.path.splitext(ckpt_name) logging.info(f'Finding threshold for {ckpt_name!r} ...') threshold_mean, accuracy_mean, threshold_safe, precision_safe = get_threshold_for_model( model, transforms.Compose(TEST_TRANSFORM + model.preprocess), samples=threshold_samples, ) logging.info(f'Threshold for {ckpt_file!r}: {threshold_mean:.4f}, accuracy: {accuracy_mean * 100.0:.2f}%') logging.info(f'Safe threshold for {ckpt_file!r}: {threshold_safe:.4f}, accuracy: {precision_safe * 100.0:.2f}%') scale = get_scale_for_model(model) logging.info(f'Scale for {ckpt_file!r}: {scale:.4f}') with tempfile.TemporaryDirectory() as td: for item, safe, threshold in [ ('feat', False, threshold_mean), ('metrics', False, threshold_mean), ('metrics', True, threshold_safe), ]: click.echo(click.style(f'Try exporting {ckpt_body!r}({model_name}, ' f'safe={safe!r})-->{item} to onnx ... '), nl=False) onnx_filename = os.path.join(output_dir or td, f'{ckpt_body}_{"safe_" if safe else ""}{item}.onnx') for item in ['feat', 'metrics']: click.echo(click.style(f'Try exporting {ckpt_body!r}({model_name})' f'-->{item} to onnx ... '), nl=False) onnx_filename = os.path.join(output_dir or td, f'{ckpt_body}_{item}.onnx') export_func = _CHECK_ITEMS[item] try: model, preprocess = _get_model_from_ckpt(model_name, ckpt_file, device='cpu', fp16=False) if verbose: export_func(model, threshold, onnx_filename, verbose=verbose) export_func(model, scale, onnx_filename, verbose=verbose) else: with disable_output(): export_func(model, threshold, onnx_filename, verbose=verbose) export_func(model, scale, onnx_filename, verbose=verbose) except: click.echo(click.style('FAILED', fg='red'), nl=True) raise Loading zoo/ccip/model.py +5 −7 Original line number Diff line number Diff line Loading @@ -2,7 +2,6 @@ import numpy as np import torch.nn from torch import nn # from zoo.utils import get_testfile from .backbone import get_backbone Loading Loading @@ -53,15 +52,14 @@ class CCIP(nn.Module): return x class LogitToConfidence(nn.Module): def __init__(self, threshold): class LogitToDiff(nn.Module): def __init__(self, scale): nn.Module.__init__(self) self.register_buffer('threshold', torch.tensor(threshold)) self.threshold: torch.Tensor self.register_buffer('scale', torch.tensor(scale)) self.scale: torch.Tensor def forward(self, x): ex = x - self.threshold return torch.exp(ex) / (torch.exp(ex) + 1.0) return (self.scale - x) / (self.scale * 2) if __name__ == '__main__': Loading zoo/ccip/onnx.py +26 −11 Original line number Diff line number Diff line Loading @@ -8,18 +8,18 @@ from torch import nn from torchvision import transforms from .dataset import TEST_TRANSFORM from .model import CCIP, LogitToConfidence from .model import CCIP, LogitToDiff from ..utils import get_testfile, onnx_optimize class ModelWithConfidence(nn.Module): def __init__(self, model, threshold): class ModelWithScaleAlign(nn.Module): def __init__(self, model, scale): nn.Module.__init__(self) self.model = model self.logit_to_conf = LogitToConfidence(threshold) self.logit_to_diff = LogitToDiff(scale) def forward(self, x): return self.logit_to_conf(self.model(x)) return torch.clip(self.logit_to_diff(self.model(x)), min=0.0, max=1.0) def get_batch_images(preprocess) -> torch.Tensor: Loading @@ -36,6 +36,21 @@ def get_batch_images(preprocess) -> torch.Tensor: ]) @torch.no_grad() def get_scale_for_model(model: CCIP): example_input = get_batch_images(model.preprocess) model = model.float() if torch.cuda.is_available(): example_input = example_input.cuda() model = model.cuda() else: example_input = example_input.cpu() model = model.cpu() dist = model(example_input) return dist[0, 0].detach().cpu().item() def _onnx_export(model, example_input, onnx_filename, opset_version: int = 14, verbose: bool = True, no_optimize: bool = False, dynamic_axes=None): model = model.float() Loading Loading @@ -70,11 +85,11 @@ def _onnx_export(model, example_input, onnx_filename, opset_version: int = 14, v onnx.save(model, onnx_filename) def export_full_model_to_onnx(model: CCIP, threshold: float, onnx_filename, opset_version: int = 14, def export_full_model_to_onnx(model: CCIP, scale: float, onnx_filename, opset_version: int = 14, verbose: bool = True, no_optimize: bool = False): example_input = get_batch_images(model.preprocess) return _onnx_export( ModelWithConfidence(model, threshold), example_input, ModelWithScaleAlign(model, scale), example_input, onnx_filename, opset_version, verbose, no_optimize, dynamic_axes={ "input": {0: "batch"}, Loading @@ -83,9 +98,9 @@ def export_full_model_to_onnx(model: CCIP, threshold: float, onnx_filename, opse ) def export_feat_model_to_onnx(model: CCIP, threshold: float, onnx_filename, opset_version: int = 14, def export_feat_model_to_onnx(model: CCIP, scale: float, onnx_filename, opset_version: int = 14, verbose: bool = True, no_optimize: bool = False): _ = threshold _ = scale example_input = get_batch_images(model.preprocess) return _onnx_export( model.feature, example_input, Loading @@ -97,14 +112,14 @@ def export_feat_model_to_onnx(model: CCIP, threshold: float, onnx_filename, opse ) def export_metrics_model_to_onnx(model: CCIP, threshold: float, onnx_filename, opset_version: int = 14, def export_metrics_model_to_onnx(model: CCIP, scale: float, onnx_filename, opset_version: int = 14, verbose: bool = True, no_optimize: bool = False): origin = get_batch_images(model.preprocess) with torch.no_grad(): example_input = model.feature(origin) return _onnx_export( ModelWithConfidence(model.metrics, threshold), example_input, ModelWithScaleAlign(model.metrics, scale), example_input, onnx_filename, opset_version, verbose, no_optimize, dynamic_axes={ "input": {0: "batch"}, Loading zoo/ccip/plot.py 0 → 100644 +154 −0 Original line number Diff line number Diff line import os from typing import Tuple import numpy as np import torch from PIL import Image from hbutils.random import keep_global_state from hbutils.system import TemporaryDirectory from matplotlib import pyplot as plt from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay from sklearn.metrics import f1_score, precision_score, recall_score, precision_recall_curve, PrecisionRecallDisplay from sklearn.metrics import roc_curve, auc, RocCurveDisplay try: from typing import Literal except (ImportError, ModuleNotFoundError): from typing_extensions import Literal def _pos_neg_to_true_score(pos, neg): y_true = np.concatenate([np.ones_like(pos), np.zeros_like(neg)]) y_value = np.concatenate([pos, neg]) return y_true, y_value def plt_confusion_matrix(ax, y_true, y_pred, title: str = 'Confusion Matrix', normalize: Literal['true', 'pred', None] = None, cmap=None): cm = confusion_matrix(y_true, y_pred, normalize=normalize) disp = ConfusionMatrixDisplay( confusion_matrix=cm, display_labels=['Diff', 'Sim'], ) disp.plot(ax=ax, cmap=cmap or plt.cm.Blues) ax.set_yticklabels(ax.get_yticklabels(), rotation=90) ax.set_title(title) @keep_global_state() def _create_score_curve(ax, name, func, pos, neg, title=None, units: int = 500, xrange: Tuple[float, float] = (0.0, 1.0)): y_true, y_score = _pos_neg_to_true_score(pos, neg) xs, ys = [], [] scores = np.sort(y_score, kind='heapsort') if len(scores) > units: scores = np.random.choice(scores, units) for score in np.sort(scores, kind='heapsort'): _y_pred = y_score >= score precision = func(y_true, _y_pred, zero_division=1) xs.append(score) ys.append(precision) xs = np.array(xs) ys = np.array(ys) maxj = np.argmax(ys) ax.plot(xs, ys, label=f'{ys[maxj]:.2f} at {xs[maxj]:.3f}') ax.set_xlabel(f'score') ax.set_ylabel(f'{name}') ax.set_xlim(xrange) ax.set_ylim([0.0, 1.0]) ax.set_title(title or f'{name} curve'.capitalize()) ax.grid() ax.legend() def plt_f1_curve(ax, pos, neg, title='F1 Curve', units: int = 500, xrange: Tuple[float, float] = (0.0, 1.0)): _create_score_curve(ax, 'F1', f1_score, pos, neg, title, units, xrange) def plt_p_curve(ax, pos, neg, title='Precision Curve', units: int = 500, xrange: Tuple[float, float] = (0.0, 1.0)): _create_score_curve(ax, 'precision', precision_score, pos, neg, title, units, xrange) def plt_r_curve(ax, pos, neg, title='Recall Curve', units: int = 500, xrange: Tuple[float, float] = (0.0, 1.0)): _create_score_curve(ax, 'recall', recall_score, pos, neg, title, units, xrange) def plt_pr_curve(ax, pos, neg, title='PR Curve'): y_true, y_score = _pos_neg_to_true_score(pos, neg) precision, recall, _ = precision_recall_curve(y_true, y_score) disp = PrecisionRecallDisplay(precision=precision, recall=recall) _map = -np.trapz(precision, recall) disp.plot(ax=ax, name=f'mAP {_map:.3f}') ax.set_title(title) ax.set_xlim([0.0, 1.0]) ax.set_ylim([0.0, 1.0]) ax.grid() ax.legend() def plt_roc_curve(ax, pos, neg, title: str = 'ROC Curve'): y_true, y_score = _pos_neg_to_true_score(pos, neg) fpr, tpr, thresholds = roc_curve(y_true, y_score) auc_value = auc(fpr, tpr) display = RocCurveDisplay(fpr=fpr, tpr=tpr, roc_auc=auc_value) display.plot(ax=ax) ax.set_title(title) ax.set_xlim([0.0, 1.0]) ax.set_ylim([0.0, 1.0]) ax.grid() ax.legend() def get_threshold_with_f1(pos, neg, units: int = 500): y_true, y_score = _pos_neg_to_true_score(pos, neg) xs, ys = [], [] scores = np.sort(y_score, kind='heapsort') if len(scores) > units: scores = np.random.choice(scores, units) for score in np.sort(scores, kind='heapsort'): _y_pred = y_score >= score precision = f1_score(y_true, _y_pred, zero_division=1) xs.append(score) ys.append(precision) xs = np.array(xs) ys = np.array(ys) maxj = np.argmax(ys) return xs[maxj].item(), ys[maxj].item() def _to_numpy(x): if isinstance(x, torch.Tensor): return x.cpu().numpy() elif isinstance(x, np.ndarray): return x elif isinstance(x, dict): return type(x)({key: _to_numpy(value) for key, value in x.items()}) elif isinstance(x, (list, tuple)): return type(x)([_to_numpy(item) for item in x]) else: return x def plt_export(func, *args, figsize=(6, 6), **kwargs) -> Image.Image: fig = plt.Figure(figsize=figsize) fig.tight_layout() func(fig.gca(), *_to_numpy(args), **_to_numpy(kwargs)) with TemporaryDirectory() as td: imgfile = os.path.join(td, 'image.png') fig.savefig(imgfile) image = Image.open(imgfile) image.load() image = image.convert('RGB') return image Loading
Makefile +4 −4 Original line number Diff line number Diff line Loading @@ -55,8 +55,8 @@ dataset: if [ ! -d ${DATASET_DIR}/monochrome_danbooru ]; then \ git clone https://${HF_NARUGO_USERNAME}:${HF_NARUGO_PASSWORD}@huggingface.co/datasets/deepghs/monochrome_danbooru.git ${DATASET_DIR}/monochrome_danbooru; \ fi if [ ! -d ${DATASET_DIR}/images_xtiny_v0 ]; then \ mkdir -p ${DATASET_DIR}/images_xtiny_v0 && \ wget -O ${DATASET_DIR}/images_xtiny_v0/images_xtiny_v0.tar.xz https://huggingface.co/datasets/deepghs/character_similarity/resolve/main/images_xtiny_v0.tar.xz && \ cd ${DATASET_DIR}/images_xtiny_v0 && tar -xvf images_xtiny_v0.tar.xz && rm -rf *.tar.xz; \ if [ ! -d ${DATASET_DIR}/images_test_v1 ]; then \ mkdir -p ${DATASET_DIR}/images_test_v1 && \ curl -L -o ${DATASET_DIR}/images_test_v1/images_test_v1.tar.xz https://huggingface.co/datasets/deepghs/character_similarity/resolve/main/images_test_v1.tar.xz && \ cd ${DATASET_DIR}/images_test_v1 && tar -xvf images_test_v1.tar.xz && rm -rf *.tar.xz; \ fi
zoo/ccip/__main__.py +27 −138 Original line number Diff line number Diff line import glob import os.path import random import tempfile from functools import partial from typing import Optional, Tuple from typing import Optional import click import torch from ditk import logging from hbutils.testing import disable_output from huggingface_hub import hf_hub_download from sklearn import svm from sklearn.metrics import accuracy_score from torchvision import transforms from tqdm.auto import tqdm from imgutils.data import load_image from .dataset import TEST_TRANSFORM from .demo import _get_model_from_ckpt from .model import CCIP from .onnx import export_feat_model_to_onnx, export_metrics_model_to_onnx from .onnx import export_feat_model_to_onnx, export_metrics_model_to_onnx, get_scale_for_model, \ export_full_model_to_onnx from ..utils import GLOBAL_CONTEXT_SETTINGS from ..utils import print_version as _origin_print_version Loading @@ -35,95 +27,12 @@ def cli(): _CHECK_ITEMS = { 'full': export_full_model_to_onnx, 'feat': export_feat_model_to_onnx, 'metrics': export_metrics_model_to_onnx, } def _sample_analysis(poss, negs, svm_samples: int = 10000): poss_cnt, negs_cnt = poss.shape[0], negs.shape[0] total = poss_cnt + negs_cnt if total > svm_samples: s_poss = poss[random.sample(range(poss_cnt), k=int(round(poss_cnt * svm_samples / total)))] s_negs = negs[random.sample(range(negs_cnt), k=int(round(negs_cnt * svm_samples / total)))] else: s_poss, s_negs = poss, negs s_poss, s_negs = s_poss.cpu(), s_negs.cpu() features = torch.cat([s_poss, s_negs]).detach().numpy() labels = torch.cat([torch.ones_like(s_poss), -torch.ones_like(s_negs)]).detach().numpy() model = svm.SVC(kernel='linear') # 线性核 model.fit(features.reshape(-1, 1), labels) predictions = model.predict(features.reshape(-1, 1)) coef = model.coef_.reshape(-1)[0].tolist() inter = model.intercept_.reshape(-1)[0].tolist() threshold = -inter / coef return poss.mean().item(), poss.std().item(), \ negs.mean().item(), negs.std().item(), \ threshold, accuracy_score(labels, predictions) def _sample_safe_threshold(poss, negs, precision: float = 0.98) -> Tuple[float, float]: items = sorted([ *((v, 1) for v in poss), *((v, 0) for v in negs), ], key=lambda x: (-x[0], -x[1])) pos_cnt, neg_cnt = 0, 0 r_threshold, r_precision = None, None for i, (v, label) in enumerate(items): if label == 0: neg_cnt += 1 else: pos_cnt += 1 current_precision = pos_cnt / (pos_cnt + neg_cnt) if r_threshold is None or current_precision >= precision or current_precision > r_precision: if i == len(items) - 1: r_threshold = v else: v_next, _ = items[i + 1] r_threshold = (v + v_next) / 2 r_precision = current_precision return r_threshold, r_precision @torch.no_grad() def get_threshold_for_model(model: CCIP, preprocess, samples: int = 200, safe_precision: float = 0.98) \ -> Tuple[float, float, float, float]: def _get_sim(x, y): x, y = load_image(x, mode='RGB'), load_image(y, mode='RGB') input_ = torch.stack([preprocess(x), preprocess(y)]) return model(input_)[0][1] dataset_dir = 'test/testfile/dataset/images_xtiny_v0/' all_images = glob.glob(os.path.join(dataset_dir, '*', '*', '*.jpg')) all_chs = sorted(set([os.path.dirname(img) for img in all_images])) not_same_samples = [] for _ in tqdm(range(samples)): x_ch, y_ch = random.sample(all_chs, k=2) x_img = random.choice(glob.glob(os.path.join(x_ch, '*.jpg'))) y_img = random.choice(glob.glob(os.path.join(y_ch, '*.jpg'))) not_same_samples.append(_get_sim(x_img, y_img)) not_same_samples = torch.as_tensor(not_same_samples) same_samples = [] for _ in tqdm(range(samples)): ch = random.choice(all_chs) x_img, y_img = random.sample(glob.glob(os.path.join(ch, '*.jpg')), k=2) same_samples.append(_get_sim(x_img, y_img)) same_samples = torch.as_tensor(same_samples) _, _, _, _, threshold, accuracy = _sample_analysis(same_samples, not_same_samples, svm_samples=samples) safe_threshold, safe_prec = _sample_safe_threshold(same_samples, not_same_samples, precision=safe_precision) return threshold, accuracy, safe_threshold, safe_prec @cli.command('onnx_check', help='Check onnx export is okay or not') @click.option('--model', '-m', 'model', type=str, required=True, help='Model to be checked. ', show_default=True) Loading @@ -131,40 +40,28 @@ def get_threshold_for_model(model: CCIP, preprocess, samples: int = 200, safe_pr help='Show verbose information.', show_default=True) @click.option('--output_dir', '-O', 'output_dir', type=click.Path(file_okay=False), default=None, help='Output directory of all models.', show_default=True) @click.option('--threshold_samples', '-T', 'threshold_samples', type=int, default=500, help='Batch of samples to find threshold.', show_default=True) def onnx_check(model: str, verbose: bool = False, output_dir: Optional[str] = None, threshold_samples: int = 500): def onnx_check(model: str, verbose: bool = False, output_dir: Optional[str] = None): logging.try_init_root(logging.INFO) model, model_name = CCIP(model), model model.eval() logging.info('Finding threshold ...') threshold_mean, accuracy_mean, threshold_safe, precision_safe = get_threshold_for_model( model, transforms.Compose(TEST_TRANSFORM + model.preprocess), samples=threshold_samples, ) logging.info(f'Threshold: {threshold_mean:.4f}, accuracy: {accuracy_mean * 100.0:.2f}%') logging.info(f'Safe threshold: {threshold_safe:.4f}, accuracy: {precision_safe * 100.0:.2f}%') scale = get_scale_for_model(model) logging.info(f'Scale: {scale:.4f}') with tempfile.TemporaryDirectory() as td: for item, safe, threshold in [ ('feat', False, threshold_mean), ('metrics', False, threshold_mean), ('metrics', True, threshold_safe), ]: click.echo(click.style(f'Try exporting {model_name}(safe={safe!r})-->{item} to onnx ... '), nl=False) onnx_filename = os.path.join(output_dir or td, f'{model_name}_{"safe_" if safe else ""}{item}.onnx') for item in ['feat', 'metrics']: click.echo(click.style(f'Try exporting {model_name}-->{item} to onnx ... '), nl=False) onnx_filename = os.path.join(output_dir or td, f'{model_name}_{item}.onnx') export_func = _CHECK_ITEMS[item] try: model = CCIP(model_name) # necessary if verbose: export_func(model, threshold, onnx_filename, verbose=verbose) export_func(model, scale, onnx_filename, verbose=verbose) else: with disable_output(): export_func(model, threshold, onnx_filename, verbose=verbose) export_func(model, scale, onnx_filename, verbose=verbose) except: click.echo(click.style('FAILED', fg='red'), nl=True) raise Loading @@ -176,50 +73,42 @@ MODELS = [ # ('caformer', 'ccip-caformer-2_fp32.ckpt'), # ('caformer', 'ccip-caformer-4_fp32.ckpt'), # ('caformer', 'ccip-caformer-5_fp32.ckpt'), ('caformer', 'ccip-caformer-23_randaug_fp32.ckpt'), # ('caformer', 'ccip-caformer-23_randaug_fp32.ckpt'), ('caformer', 'ccip-caformer-24-randaug-pruned.ckpt'), ('caformer_query', 'ccip-caformer_query-12.ckpt'), ] @cli.command('export', help='Export all models as onnx.', context_settings={**GLOBAL_CONTEXT_SETTINGS}) @click.option('--repository', '-r', 'repository', type=str, default='deepghs/ccip', help='Source repository.', show_default=True) @click.option('--output_dir', '-O', 'output_dir', type=click.Path(file_okay=False), required=True, help='Output directory of all models.', show_default=True) @click.option('--verbose', '-V', 'verbose', is_flag=True, type=bool, default=False, help='Show verbose information.', show_default=True) @click.option('--threshold_samples', '-T', 'threshold_samples', type=int, default=500, help='Batch of samples to find threshold.', show_default=True) def export(output_dir: str, verbose: bool = False, threshold_samples: int = 500): def export(repository: str, output_dir: str, verbose: bool = False): for model_name, ckpt_name in MODELS: ckpt_file = hf_hub_download('deepghs/ccip', ckpt_name, repo_type='model') ckpt_file = hf_hub_download(repository, ckpt_name, repo_type='model') model, preprocess = _get_model_from_ckpt(model_name, ckpt_file, device='cpu', fp16=False) ckpt_body, _ = os.path.splitext(ckpt_name) logging.info(f'Finding threshold for {ckpt_name!r} ...') threshold_mean, accuracy_mean, threshold_safe, precision_safe = get_threshold_for_model( model, transforms.Compose(TEST_TRANSFORM + model.preprocess), samples=threshold_samples, ) logging.info(f'Threshold for {ckpt_file!r}: {threshold_mean:.4f}, accuracy: {accuracy_mean * 100.0:.2f}%') logging.info(f'Safe threshold for {ckpt_file!r}: {threshold_safe:.4f}, accuracy: {precision_safe * 100.0:.2f}%') scale = get_scale_for_model(model) logging.info(f'Scale for {ckpt_file!r}: {scale:.4f}') with tempfile.TemporaryDirectory() as td: for item, safe, threshold in [ ('feat', False, threshold_mean), ('metrics', False, threshold_mean), ('metrics', True, threshold_safe), ]: click.echo(click.style(f'Try exporting {ckpt_body!r}({model_name}, ' f'safe={safe!r})-->{item} to onnx ... '), nl=False) onnx_filename = os.path.join(output_dir or td, f'{ckpt_body}_{"safe_" if safe else ""}{item}.onnx') for item in ['feat', 'metrics']: click.echo(click.style(f'Try exporting {ckpt_body!r}({model_name})' f'-->{item} to onnx ... '), nl=False) onnx_filename = os.path.join(output_dir or td, f'{ckpt_body}_{item}.onnx') export_func = _CHECK_ITEMS[item] try: model, preprocess = _get_model_from_ckpt(model_name, ckpt_file, device='cpu', fp16=False) if verbose: export_func(model, threshold, onnx_filename, verbose=verbose) export_func(model, scale, onnx_filename, verbose=verbose) else: with disable_output(): export_func(model, threshold, onnx_filename, verbose=verbose) export_func(model, scale, onnx_filename, verbose=verbose) except: click.echo(click.style('FAILED', fg='red'), nl=True) raise Loading
zoo/ccip/model.py +5 −7 Original line number Diff line number Diff line Loading @@ -2,7 +2,6 @@ import numpy as np import torch.nn from torch import nn # from zoo.utils import get_testfile from .backbone import get_backbone Loading Loading @@ -53,15 +52,14 @@ class CCIP(nn.Module): return x class LogitToConfidence(nn.Module): def __init__(self, threshold): class LogitToDiff(nn.Module): def __init__(self, scale): nn.Module.__init__(self) self.register_buffer('threshold', torch.tensor(threshold)) self.threshold: torch.Tensor self.register_buffer('scale', torch.tensor(scale)) self.scale: torch.Tensor def forward(self, x): ex = x - self.threshold return torch.exp(ex) / (torch.exp(ex) + 1.0) return (self.scale - x) / (self.scale * 2) if __name__ == '__main__': Loading
zoo/ccip/onnx.py +26 −11 Original line number Diff line number Diff line Loading @@ -8,18 +8,18 @@ from torch import nn from torchvision import transforms from .dataset import TEST_TRANSFORM from .model import CCIP, LogitToConfidence from .model import CCIP, LogitToDiff from ..utils import get_testfile, onnx_optimize class ModelWithConfidence(nn.Module): def __init__(self, model, threshold): class ModelWithScaleAlign(nn.Module): def __init__(self, model, scale): nn.Module.__init__(self) self.model = model self.logit_to_conf = LogitToConfidence(threshold) self.logit_to_diff = LogitToDiff(scale) def forward(self, x): return self.logit_to_conf(self.model(x)) return torch.clip(self.logit_to_diff(self.model(x)), min=0.0, max=1.0) def get_batch_images(preprocess) -> torch.Tensor: Loading @@ -36,6 +36,21 @@ def get_batch_images(preprocess) -> torch.Tensor: ]) @torch.no_grad() def get_scale_for_model(model: CCIP): example_input = get_batch_images(model.preprocess) model = model.float() if torch.cuda.is_available(): example_input = example_input.cuda() model = model.cuda() else: example_input = example_input.cpu() model = model.cpu() dist = model(example_input) return dist[0, 0].detach().cpu().item() def _onnx_export(model, example_input, onnx_filename, opset_version: int = 14, verbose: bool = True, no_optimize: bool = False, dynamic_axes=None): model = model.float() Loading Loading @@ -70,11 +85,11 @@ def _onnx_export(model, example_input, onnx_filename, opset_version: int = 14, v onnx.save(model, onnx_filename) def export_full_model_to_onnx(model: CCIP, threshold: float, onnx_filename, opset_version: int = 14, def export_full_model_to_onnx(model: CCIP, scale: float, onnx_filename, opset_version: int = 14, verbose: bool = True, no_optimize: bool = False): example_input = get_batch_images(model.preprocess) return _onnx_export( ModelWithConfidence(model, threshold), example_input, ModelWithScaleAlign(model, scale), example_input, onnx_filename, opset_version, verbose, no_optimize, dynamic_axes={ "input": {0: "batch"}, Loading @@ -83,9 +98,9 @@ def export_full_model_to_onnx(model: CCIP, threshold: float, onnx_filename, opse ) def export_feat_model_to_onnx(model: CCIP, threshold: float, onnx_filename, opset_version: int = 14, def export_feat_model_to_onnx(model: CCIP, scale: float, onnx_filename, opset_version: int = 14, verbose: bool = True, no_optimize: bool = False): _ = threshold _ = scale example_input = get_batch_images(model.preprocess) return _onnx_export( model.feature, example_input, Loading @@ -97,14 +112,14 @@ def export_feat_model_to_onnx(model: CCIP, threshold: float, onnx_filename, opse ) def export_metrics_model_to_onnx(model: CCIP, threshold: float, onnx_filename, opset_version: int = 14, def export_metrics_model_to_onnx(model: CCIP, scale: float, onnx_filename, opset_version: int = 14, verbose: bool = True, no_optimize: bool = False): origin = get_batch_images(model.preprocess) with torch.no_grad(): example_input = model.feature(origin) return _onnx_export( ModelWithConfidence(model.metrics, threshold), example_input, ModelWithScaleAlign(model.metrics, scale), example_input, onnx_filename, opset_version, verbose, no_optimize, dynamic_axes={ "input": {0: "batch"}, Loading
zoo/ccip/plot.py 0 → 100644 +154 −0 Original line number Diff line number Diff line import os from typing import Tuple import numpy as np import torch from PIL import Image from hbutils.random import keep_global_state from hbutils.system import TemporaryDirectory from matplotlib import pyplot as plt from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay from sklearn.metrics import f1_score, precision_score, recall_score, precision_recall_curve, PrecisionRecallDisplay from sklearn.metrics import roc_curve, auc, RocCurveDisplay try: from typing import Literal except (ImportError, ModuleNotFoundError): from typing_extensions import Literal def _pos_neg_to_true_score(pos, neg): y_true = np.concatenate([np.ones_like(pos), np.zeros_like(neg)]) y_value = np.concatenate([pos, neg]) return y_true, y_value def plt_confusion_matrix(ax, y_true, y_pred, title: str = 'Confusion Matrix', normalize: Literal['true', 'pred', None] = None, cmap=None): cm = confusion_matrix(y_true, y_pred, normalize=normalize) disp = ConfusionMatrixDisplay( confusion_matrix=cm, display_labels=['Diff', 'Sim'], ) disp.plot(ax=ax, cmap=cmap or plt.cm.Blues) ax.set_yticklabels(ax.get_yticklabels(), rotation=90) ax.set_title(title) @keep_global_state() def _create_score_curve(ax, name, func, pos, neg, title=None, units: int = 500, xrange: Tuple[float, float] = (0.0, 1.0)): y_true, y_score = _pos_neg_to_true_score(pos, neg) xs, ys = [], [] scores = np.sort(y_score, kind='heapsort') if len(scores) > units: scores = np.random.choice(scores, units) for score in np.sort(scores, kind='heapsort'): _y_pred = y_score >= score precision = func(y_true, _y_pred, zero_division=1) xs.append(score) ys.append(precision) xs = np.array(xs) ys = np.array(ys) maxj = np.argmax(ys) ax.plot(xs, ys, label=f'{ys[maxj]:.2f} at {xs[maxj]:.3f}') ax.set_xlabel(f'score') ax.set_ylabel(f'{name}') ax.set_xlim(xrange) ax.set_ylim([0.0, 1.0]) ax.set_title(title or f'{name} curve'.capitalize()) ax.grid() ax.legend() def plt_f1_curve(ax, pos, neg, title='F1 Curve', units: int = 500, xrange: Tuple[float, float] = (0.0, 1.0)): _create_score_curve(ax, 'F1', f1_score, pos, neg, title, units, xrange) def plt_p_curve(ax, pos, neg, title='Precision Curve', units: int = 500, xrange: Tuple[float, float] = (0.0, 1.0)): _create_score_curve(ax, 'precision', precision_score, pos, neg, title, units, xrange) def plt_r_curve(ax, pos, neg, title='Recall Curve', units: int = 500, xrange: Tuple[float, float] = (0.0, 1.0)): _create_score_curve(ax, 'recall', recall_score, pos, neg, title, units, xrange) def plt_pr_curve(ax, pos, neg, title='PR Curve'): y_true, y_score = _pos_neg_to_true_score(pos, neg) precision, recall, _ = precision_recall_curve(y_true, y_score) disp = PrecisionRecallDisplay(precision=precision, recall=recall) _map = -np.trapz(precision, recall) disp.plot(ax=ax, name=f'mAP {_map:.3f}') ax.set_title(title) ax.set_xlim([0.0, 1.0]) ax.set_ylim([0.0, 1.0]) ax.grid() ax.legend() def plt_roc_curve(ax, pos, neg, title: str = 'ROC Curve'): y_true, y_score = _pos_neg_to_true_score(pos, neg) fpr, tpr, thresholds = roc_curve(y_true, y_score) auc_value = auc(fpr, tpr) display = RocCurveDisplay(fpr=fpr, tpr=tpr, roc_auc=auc_value) display.plot(ax=ax) ax.set_title(title) ax.set_xlim([0.0, 1.0]) ax.set_ylim([0.0, 1.0]) ax.grid() ax.legend() def get_threshold_with_f1(pos, neg, units: int = 500): y_true, y_score = _pos_neg_to_true_score(pos, neg) xs, ys = [], [] scores = np.sort(y_score, kind='heapsort') if len(scores) > units: scores = np.random.choice(scores, units) for score in np.sort(scores, kind='heapsort'): _y_pred = y_score >= score precision = f1_score(y_true, _y_pred, zero_division=1) xs.append(score) ys.append(precision) xs = np.array(xs) ys = np.array(ys) maxj = np.argmax(ys) return xs[maxj].item(), ys[maxj].item() def _to_numpy(x): if isinstance(x, torch.Tensor): return x.cpu().numpy() elif isinstance(x, np.ndarray): return x elif isinstance(x, dict): return type(x)({key: _to_numpy(value) for key, value in x.items()}) elif isinstance(x, (list, tuple)): return type(x)([_to_numpy(item) for item in x]) else: return x def plt_export(func, *args, figsize=(6, 6), **kwargs) -> Image.Image: fig = plt.Figure(figsize=figsize) fig.tight_layout() func(fig.gca(), *_to_numpy(args), **_to_numpy(kwargs)) with TemporaryDirectory() as td: imgfile = os.path.join(td, 'image.png') fig.savefig(imgfile) image = Image.open(imgfile) image.load() image = image.convert('RGB') return image