Loading docs/source/api_doc/metrics/ccip_benchmark.plot.py +3 −3 Original line number Diff line number Diff line Loading @@ -19,7 +19,7 @@ class CCIPFeatureBenchmark(BaseBenchmark): def run(self): image_file = random.choice(self.all_images) _ = ccip_batch_extract_features([image_file], model_name=self.model_name) _ = ccip_batch_extract_features([image_file], model=self.model_name) class CCIPDiffBenchmark(BaseBenchmark): Loading @@ -28,7 +28,7 @@ class CCIPDiffBenchmark(BaseBenchmark): self.model_name = model_name def prepare(self): self.feats = list(ccip_batch_extract_features(random.sample(self.all_images, k=30), model_name=self.model_name)) self.feats = list(ccip_batch_extract_features(random.sample(self.all_images, k=30), model=self.model_name)) def load(self): from imgutils.metrics.ccip import _open_metric_model Loading @@ -41,7 +41,7 @@ class CCIPDiffBenchmark(BaseBenchmark): def run(self): feat1 = random.choice(self.feats) feat2 = random.choice(self.feats) _ = ccip_difference(feat1, feat2, model_name=self.model_name) _ = ccip_difference(feat1, feat2, model=self.model_name) if __name__ == '__main__': Loading imgutils/metrics/ccip.py +41 −41 Original line number Diff line number Diff line Loading @@ -46,30 +46,30 @@ def _preprocess_image(image: Image.Image, size: int = 384): @lru_cache() def _open_feat_model(model_name): def _open_feat_model(model): return open_onnx_model(hf_hub_download( f'deepghs/ccip_onnx', f'{model_name}/model_feat.onnx', f'{model}/model_feat.onnx', )) @lru_cache() def _open_metric_model(model_name): def _open_metric_model(model): return open_onnx_model(hf_hub_download( f'deepghs/ccip_onnx', f'{model_name}/model_metrics.onnx', f'{model}/model_metrics.onnx', )) @lru_cache() def _open_metrics(model_name): with open(hf_hub_download(f'deepghs/ccip_onnx', f'{model_name}/metrics.json'), 'r') as f: def _open_metrics(model): with open(hf_hub_download(f'deepghs/ccip_onnx', f'{model}/metrics.json'), 'r') as f: return json.load(f) @lru_cache() def _open_cluster_metrics(model_name): with open(hf_hub_download(f'deepghs/ccip_onnx', f'{model_name}/cluster.json'), 'r') as f: def _open_cluster_metrics(model): with open(hf_hub_download(f'deepghs/ccip_onnx', f'{model}/cluster.json'), 'r') as f: return json.load(f) Loading @@ -81,88 +81,88 @@ _VALID_MODEL_NAMES = [ _DEFAULT_MODEL_NAMES = 'ccip-caformer-24-randaug-pruned' def ccip_extract_feature(image: ImageTyping, size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES): return ccip_batch_extract_features([image], size, model_name)[0] def ccip_extract_feature(image: ImageTyping, size: int = 384, model: str = _DEFAULT_MODEL_NAMES): return ccip_batch_extract_features([image], size, model)[0] def ccip_batch_extract_features(images: MultiImagesTyping, size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES): def ccip_batch_extract_features(images: MultiImagesTyping, size: int = 384, model: str = _DEFAULT_MODEL_NAMES): images = load_images(images, mode='RGB') data = np.stack([_preprocess_image(item, size=size) for item in images]).astype(np.float32) output, = _open_feat_model(model_name).run(['output'], {'input': data}) output, = _open_feat_model(model).run(['output'], {'input': data}) return output _FeatureOrImage = Union[ImageTyping, np.ndarray] def _p_feature(x: _FeatureOrImage, size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES): def _p_feature(x: _FeatureOrImage, size: int = 384, model: str = _DEFAULT_MODEL_NAMES): if isinstance(x, np.ndarray): # if feature return x else: # is image or path return ccip_extract_feature(x, size, model_name) return ccip_extract_feature(x, size, model) def ccip_default_threshold(model_name: str = _DEFAULT_MODEL_NAMES) -> float: return _open_metrics(model_name)['threshold'] def ccip_default_threshold(model: str = _DEFAULT_MODEL_NAMES) -> float: return _open_metrics(model)['threshold'] def ccip_difference(x: _FeatureOrImage, y: _FeatureOrImage, size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES) -> float: return ccip_batch_differences([x, y], size, model_name)[0, 1].item() size: int = 384, model: str = _DEFAULT_MODEL_NAMES) -> float: return ccip_batch_differences([x, y], size, model)[0, 1].item() def ccip_same(x: _FeatureOrImage, y: _FeatureOrImage, threshold: Optional[float] = None, size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES) -> float: diff = ccip_difference(x, y, size, model_name) threshold = threshold if threshold is not None else ccip_default_threshold(model_name) size: int = 384, model: str = _DEFAULT_MODEL_NAMES) -> float: diff = ccip_difference(x, y, size, model) threshold = threshold if threshold is not None else ccip_default_threshold(model) return diff <= threshold def ccip_batch_differences(images: List[_FeatureOrImage], size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES) -> np.ndarray: input_ = np.stack([_p_feature(img, size, model_name) for img in images]).astype(np.float32) output, = _open_metric_model(model_name).run(['output'], {'input': input_}) size: int = 384, model: str = _DEFAULT_MODEL_NAMES) -> np.ndarray: input_ = np.stack([_p_feature(img, size, model) for img in images]).astype(np.float32) output, = _open_metric_model(model).run(['output'], {'input': input_}) return output def ccip_batch_same(images: List[_FeatureOrImage], threshold: Optional[float] = None, size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES) -> np.ndarray: batch_diff = ccip_batch_differences(images, size, model_name) threshold = threshold if threshold is not None else ccip_default_threshold(model_name) size: int = 384, model: str = _DEFAULT_MODEL_NAMES) -> np.ndarray: batch_diff = ccip_batch_differences(images, size, model) threshold = threshold if threshold is not None else ccip_default_threshold(model) return batch_diff <= threshold CCIPClusterModeTyping = Literal['dbscane', 'dbscan_2', 'dbscan_free', 'optics'] CCIPClusterMethodTyping = Literal['dbscane', 'dbscan_2', 'dbscan_free', 'optics'] def ccip_default_clustering_params(model_name: str = _DEFAULT_MODEL_NAMES, mode: CCIPClusterModeTyping = 'dbscan') -> Tuple[float, int]: if mode == 'dbscan': return ccip_default_threshold(model_name), 2 def ccip_default_clustering_params(model: str = _DEFAULT_MODEL_NAMES, method: CCIPClusterMethodTyping = 'dbscan') -> Tuple[float, int]: if method == 'dbscan': return ccip_default_threshold(model), 2 else: _info = _open_cluster_metrics(model_name)[mode] _info = _open_cluster_metrics(model)[method] return _info['eps'], _info['min_samples'] def ccip_clustering(images: List[_FeatureOrImage], mode: CCIPClusterModeTyping = 'dbscan', def ccip_clustering(images: List[_FeatureOrImage], method: CCIPClusterMethodTyping = 'dbscan', eps: Optional[float] = None, min_samples: Optional[int] = None, size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES) -> np.ndarray: _default_eps, _default_min_samples = ccip_default_clustering_params(model_name, mode) size: int = 384, model: str = _DEFAULT_MODEL_NAMES) -> np.ndarray: _default_eps, _default_min_samples = ccip_default_clustering_params(model, method) eps = eps or _default_eps min_samples = min_samples or _default_min_samples images = [_p_feature(img, size, model_name) for img in tqdm(images, desc='Extract features')] batch_diff = ccip_batch_differences(images, size, model_name) images = [_p_feature(img, size, model) for img in tqdm(images, desc='Extract features')] batch_diff = ccip_batch_differences(images, size, model) def _metric(x, y): return batch_diff[int(x), int(y)].item() samples = np.arange(len(images)).reshape(-1, 1) if 'dbscan' in mode: if 'dbscan' in method: clustering = DBSCAN(eps=eps, min_samples=min_samples, metric=_metric).fit(samples) elif mode == 'optics': elif method == 'optics': clustering = OPTICS(max_eps=eps, min_samples=min_samples, metric=_metric).fit(samples) else: raise ValueError(f'Unknown mode for CCIP clustering - {mode!r}.') raise ValueError(f'Unknown mode for CCIP clustering - {method!r}.') return clustering.labels_.tolist() Loading
docs/source/api_doc/metrics/ccip_benchmark.plot.py +3 −3 Original line number Diff line number Diff line Loading @@ -19,7 +19,7 @@ class CCIPFeatureBenchmark(BaseBenchmark): def run(self): image_file = random.choice(self.all_images) _ = ccip_batch_extract_features([image_file], model_name=self.model_name) _ = ccip_batch_extract_features([image_file], model=self.model_name) class CCIPDiffBenchmark(BaseBenchmark): Loading @@ -28,7 +28,7 @@ class CCIPDiffBenchmark(BaseBenchmark): self.model_name = model_name def prepare(self): self.feats = list(ccip_batch_extract_features(random.sample(self.all_images, k=30), model_name=self.model_name)) self.feats = list(ccip_batch_extract_features(random.sample(self.all_images, k=30), model=self.model_name)) def load(self): from imgutils.metrics.ccip import _open_metric_model Loading @@ -41,7 +41,7 @@ class CCIPDiffBenchmark(BaseBenchmark): def run(self): feat1 = random.choice(self.feats) feat2 = random.choice(self.feats) _ = ccip_difference(feat1, feat2, model_name=self.model_name) _ = ccip_difference(feat1, feat2, model=self.model_name) if __name__ == '__main__': Loading
imgutils/metrics/ccip.py +41 −41 Original line number Diff line number Diff line Loading @@ -46,30 +46,30 @@ def _preprocess_image(image: Image.Image, size: int = 384): @lru_cache() def _open_feat_model(model_name): def _open_feat_model(model): return open_onnx_model(hf_hub_download( f'deepghs/ccip_onnx', f'{model_name}/model_feat.onnx', f'{model}/model_feat.onnx', )) @lru_cache() def _open_metric_model(model_name): def _open_metric_model(model): return open_onnx_model(hf_hub_download( f'deepghs/ccip_onnx', f'{model_name}/model_metrics.onnx', f'{model}/model_metrics.onnx', )) @lru_cache() def _open_metrics(model_name): with open(hf_hub_download(f'deepghs/ccip_onnx', f'{model_name}/metrics.json'), 'r') as f: def _open_metrics(model): with open(hf_hub_download(f'deepghs/ccip_onnx', f'{model}/metrics.json'), 'r') as f: return json.load(f) @lru_cache() def _open_cluster_metrics(model_name): with open(hf_hub_download(f'deepghs/ccip_onnx', f'{model_name}/cluster.json'), 'r') as f: def _open_cluster_metrics(model): with open(hf_hub_download(f'deepghs/ccip_onnx', f'{model}/cluster.json'), 'r') as f: return json.load(f) Loading @@ -81,88 +81,88 @@ _VALID_MODEL_NAMES = [ _DEFAULT_MODEL_NAMES = 'ccip-caformer-24-randaug-pruned' def ccip_extract_feature(image: ImageTyping, size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES): return ccip_batch_extract_features([image], size, model_name)[0] def ccip_extract_feature(image: ImageTyping, size: int = 384, model: str = _DEFAULT_MODEL_NAMES): return ccip_batch_extract_features([image], size, model)[0] def ccip_batch_extract_features(images: MultiImagesTyping, size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES): def ccip_batch_extract_features(images: MultiImagesTyping, size: int = 384, model: str = _DEFAULT_MODEL_NAMES): images = load_images(images, mode='RGB') data = np.stack([_preprocess_image(item, size=size) for item in images]).astype(np.float32) output, = _open_feat_model(model_name).run(['output'], {'input': data}) output, = _open_feat_model(model).run(['output'], {'input': data}) return output _FeatureOrImage = Union[ImageTyping, np.ndarray] def _p_feature(x: _FeatureOrImage, size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES): def _p_feature(x: _FeatureOrImage, size: int = 384, model: str = _DEFAULT_MODEL_NAMES): if isinstance(x, np.ndarray): # if feature return x else: # is image or path return ccip_extract_feature(x, size, model_name) return ccip_extract_feature(x, size, model) def ccip_default_threshold(model_name: str = _DEFAULT_MODEL_NAMES) -> float: return _open_metrics(model_name)['threshold'] def ccip_default_threshold(model: str = _DEFAULT_MODEL_NAMES) -> float: return _open_metrics(model)['threshold'] def ccip_difference(x: _FeatureOrImage, y: _FeatureOrImage, size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES) -> float: return ccip_batch_differences([x, y], size, model_name)[0, 1].item() size: int = 384, model: str = _DEFAULT_MODEL_NAMES) -> float: return ccip_batch_differences([x, y], size, model)[0, 1].item() def ccip_same(x: _FeatureOrImage, y: _FeatureOrImage, threshold: Optional[float] = None, size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES) -> float: diff = ccip_difference(x, y, size, model_name) threshold = threshold if threshold is not None else ccip_default_threshold(model_name) size: int = 384, model: str = _DEFAULT_MODEL_NAMES) -> float: diff = ccip_difference(x, y, size, model) threshold = threshold if threshold is not None else ccip_default_threshold(model) return diff <= threshold def ccip_batch_differences(images: List[_FeatureOrImage], size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES) -> np.ndarray: input_ = np.stack([_p_feature(img, size, model_name) for img in images]).astype(np.float32) output, = _open_metric_model(model_name).run(['output'], {'input': input_}) size: int = 384, model: str = _DEFAULT_MODEL_NAMES) -> np.ndarray: input_ = np.stack([_p_feature(img, size, model) for img in images]).astype(np.float32) output, = _open_metric_model(model).run(['output'], {'input': input_}) return output def ccip_batch_same(images: List[_FeatureOrImage], threshold: Optional[float] = None, size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES) -> np.ndarray: batch_diff = ccip_batch_differences(images, size, model_name) threshold = threshold if threshold is not None else ccip_default_threshold(model_name) size: int = 384, model: str = _DEFAULT_MODEL_NAMES) -> np.ndarray: batch_diff = ccip_batch_differences(images, size, model) threshold = threshold if threshold is not None else ccip_default_threshold(model) return batch_diff <= threshold CCIPClusterModeTyping = Literal['dbscane', 'dbscan_2', 'dbscan_free', 'optics'] CCIPClusterMethodTyping = Literal['dbscane', 'dbscan_2', 'dbscan_free', 'optics'] def ccip_default_clustering_params(model_name: str = _DEFAULT_MODEL_NAMES, mode: CCIPClusterModeTyping = 'dbscan') -> Tuple[float, int]: if mode == 'dbscan': return ccip_default_threshold(model_name), 2 def ccip_default_clustering_params(model: str = _DEFAULT_MODEL_NAMES, method: CCIPClusterMethodTyping = 'dbscan') -> Tuple[float, int]: if method == 'dbscan': return ccip_default_threshold(model), 2 else: _info = _open_cluster_metrics(model_name)[mode] _info = _open_cluster_metrics(model)[method] return _info['eps'], _info['min_samples'] def ccip_clustering(images: List[_FeatureOrImage], mode: CCIPClusterModeTyping = 'dbscan', def ccip_clustering(images: List[_FeatureOrImage], method: CCIPClusterMethodTyping = 'dbscan', eps: Optional[float] = None, min_samples: Optional[int] = None, size: int = 384, model_name: str = _DEFAULT_MODEL_NAMES) -> np.ndarray: _default_eps, _default_min_samples = ccip_default_clustering_params(model_name, mode) size: int = 384, model: str = _DEFAULT_MODEL_NAMES) -> np.ndarray: _default_eps, _default_min_samples = ccip_default_clustering_params(model, method) eps = eps or _default_eps min_samples = min_samples or _default_min_samples images = [_p_feature(img, size, model_name) for img in tqdm(images, desc='Extract features')] batch_diff = ccip_batch_differences(images, size, model_name) images = [_p_feature(img, size, model) for img in tqdm(images, desc='Extract features')] batch_diff = ccip_batch_differences(images, size, model) def _metric(x, y): return batch_diff[int(x), int(y)].item() samples = np.arange(len(images)).reshape(-1, 1) if 'dbscan' in mode: if 'dbscan' in method: clustering = DBSCAN(eps=eps, min_samples=min_samples, metric=_metric).fit(samples) elif mode == 'optics': elif method == 'optics': clustering = OPTICS(max_eps=eps, min_samples=min_samples, metric=_metric).fit(samples) else: raise ValueError(f'Unknown mode for CCIP clustering - {mode!r}.') raise ValueError(f'Unknown mode for CCIP clustering - {method!r}.') return clustering.labels_.tolist()