Loading README.md +2 −2 Original line number Diff line number Diff line Loading @@ -22,7 +22,7 @@ This tool scans for SBOM files and publishes them to a Dependency Track server. positional arguments: sbom_patterns SBOM file patterns to publish (supports glob patterns) Default: '**/*.cyclonedx.json' Default: '**/*.cyclonedx.json **/*.cyclonedx.xml' options: -h, --help show this help message and exit Loading @@ -39,7 +39,7 @@ options: `sbom-scanner` accepts SBOM file patterns to publish (supports glob patterns) as multiple positional arguments. If none is specified, the program will look for SBOM files matching `**/*.cyclonedx.json`. If none is specified, the program will look for SBOM files matching `**/*.cyclonedx.json` and `**/*.cyclonedx.xml`. ### Options Loading sbom_scanner/scan.py +73 −74 Original line number Diff line number Diff line Loading @@ -3,6 +3,7 @@ import glob import json import os import ssl import sys from enum import Enum from functools import cache from logging import Logger Loading Loading @@ -129,30 +130,21 @@ class Scanner: headers={"X-API-Key": self.api_key, "accept": "application/json"}, ).json()["permissions"] ] # resp = urlopen( # Request( # f"{self.base_api_url}/v1/team/self", # headers={"X-API-Key": self.api_key, "Accept": "application/json"}, # method="GET", # ), # context=INSECURE_SSL_CTX if self.insecure else None, # ) # resp_data = json.load(resp) # return [permission["name"] for permission in resp_data["permissions"]] def has_permission(self, perm: DtPermission) -> bool: return perm in self.get_permissions() # rewinds the given project path and creates a DT project for each non-UUID defined project # retunrs the project path with created UUIDS def create_parent_projects(self, project_path: list[str]) -> list[str]: if len(project_path) == 0: return project_path project_def = DtProjectDef(project_path[-1]) # retunrs the tail project UUID @cache def get_or_create_project(self, project_path: str, classifier="application") -> str: project_path_parts = project_path.split("/") project_def = DtProjectDef(project_path_parts[-1]) if project_def.is_uuid: # TODO: check if project exists? print(f" ... {AnsiColors.YELLOW}{'/'.join(project_path)}{AnsiColors.RESET} is UUID: assume exists") return project_path print( f"- {AnsiColors.YELLOW}{project_path}{AnsiColors.RESET} is UUID: assume exists..." ) return project_def.uuid # project is defined by name/version... resp = requests.get( Loading @@ -172,34 +164,33 @@ class Scanner: if matching_prj: # project already exists: replace name with found UUID print( f" ... {AnsiColors.YELLOW}{'/'.join(project_path)}{AnsiColors.RESET} found (by name/version): {matching_prj['uuid']}" f"- {AnsiColors.YELLOW}{project_path}{AnsiColors.RESET} found (by name/version): {matching_prj['uuid']}..." ) project_path[-1] = "#" + matching_prj["uuid"] return project_path return matching_prj["uuid"] # TODO: if project exists but not the version, we have to CLONE it # project does not exist: create it # TODO: smart classifier data = { "name": project_def.name, "version": project_def.version, "classifier": "APPLICATION", "classifier": classifier.upper(), "active": True, } # TODO: externalReferences # data["externalReferences"] = [{"type":"vcs","url":project_url}], if len(project_path) > 1: if len(project_path_parts) > 1: # project to create is not a root project: retrieve parent parent_def = DtProjectDef(project_path[-2]) parent_def = DtProjectDef(project_path_parts[-2]) if not parent_def.is_uuid: # create parent project resolved_path = self.create_parent_projects(project_path[:-1]) parent_uuid = self.get_or_create_project("/".join(project_path_parts[:-1])) # now parent def must be a UUID parent_def = DtProjectDef(resolved_path[-1]) parent_def = DtProjectDef("#" + parent_uuid) # add parent UUID to params data["parent"] = {"uuid": parent_def.uuid} print( f" ... {AnsiColors.YELLOW}{'/'.join(project_path)}{AnsiColors.RESET} doesn't exist: create with params {AnsiColors.HGRAY}{json.dumps(data)}{AnsiColors.RESET}..." f"- {AnsiColors.YELLOW}{project_path}{AnsiColors.RESET} not found: create with params {AnsiColors.HGRAY}{json.dumps(data)}{AnsiColors.RESET}..." ) resp = requests.put( f"{self.base_api_url}/v1/project", Loading @@ -214,15 +205,15 @@ class Scanner: resp.raise_for_status() except requests.exceptions.HTTPError as he: print( f" ... create {AnsiColors.YELLOW}{'/'.join(project_path)}{AnsiColors.RESET} {AnsiColors.HRED}failed{AnsiColors.RESET} ({he.response.status_code}): {AnsiColors.HGRAY}{he.response.text}{AnsiColors.RESET}", he, f"- create {AnsiColors.YELLOW}{project_path}{AnsiColors.RESET} {AnsiColors.HRED}failed{AnsiColors.RESET} (err {he.response.status_code}): {AnsiColors.HGRAY}{he.response.text}{AnsiColors.RESET}", ) raise # retrieve UUID from response and return created_uuid = resp.json()["uuid"] print(f" ... {AnsiColors.YELLOW}{'/'.join(project_path)}{AnsiColors.RESET} created: {created_uuid}") project_path[-1] = "#" + created_uuid return project_path print( f"- {AnsiColors.YELLOW}{project_path}{AnsiColors.RESET} {AnsiColors.HGREEN}successfully{AnsiColors.RESET} created: {created_uuid}" ) return created_uuid def publish(self, sbom_file: Path): print( Loading @@ -230,11 +221,10 @@ class Scanner: ) # compute the target project path sbom_prefix = sbom_file.name.split(".")[0] target_project_path = self.project_path.replace( "%{file_prefix}", sbom_prefix ).split("/") # sbom_extension = sbom_file.name.split(".")[-1] project_path = self.project_path.replace("%{file_prefix}", sbom_prefix) print( f" - target project: {AnsiColors.YELLOW}{'/'.join(target_project_path)}{AnsiColors.RESET}" f"- target project: {AnsiColors.YELLOW}{project_path}{AnsiColors.RESET}..." ) # load the SBOM content with open(sbom_file, "r") as reader: Loading @@ -242,15 +232,14 @@ class Scanner: # normalize SBOM (shorten IDs, ...) # TODO self.do_publish(sbom_content, target_project_path) self.do_publish(sbom_content, project_path) self.sbom_count += 1 def do_publish( self, sbom_content: str, target_project_path: list[str], allow_retry=True ): def do_publish(self, sbom_content: str, project_path: str, allow_retry=True): project_path_parts = project_path.split("/") # determine publish params params = {} project_def = DtProjectDef(target_project_path[-1]) project_def = DtProjectDef(project_path_parts[-1]) if project_def.is_uuid: # target project definition is a UUID: nothing more is required params["project"] = project_def.uuid Loading @@ -261,8 +250,8 @@ class Scanner: if self.has_permission(DtPermission.PROJECT_CREATION_UPLOAD): params["autoCreate"] = "true" if len(target_project_path) > 1: parent_def = DtProjectDef(target_project_path[-2]) if len(project_path_parts) > 1: parent_def = DtProjectDef(project_path_parts[-2]) if parent_def.is_uuid: params["parentUUID"] = parent_def.uuid else: Loading @@ -273,33 +262,19 @@ class Scanner: print( f"- publish params: {AnsiColors.HGRAY}{json.dumps(params)}{AnsiColors.RESET}..." ) params["bom"] = sbom_content # with urllib # resp = urlopen( # Request( # f"{self.base_api_url}/v1/bom", # headers={"X-API-Key": self.api_key, "Content-Type": "application/json", "Accept": "application/json"}, # method="PUT", # data=json.dumps(params).encode(), # ), # context=INSECURE_SSL_CTX if self.insecure else None, # ) # resp_data = json.load(resp) # print(f" => publish response: {resp.getCode()} - {resp_data}") resp = requests.post( f"{self.base_api_url}/v1/bom", headers={"X-API-Key": self.api_key, "accept": "application/json"}, files=params, files={"bom": sbom_content, **params}, ) try: resp.raise_for_status() print( f" => publish {AnsiColors.HGREEN}succeeded{AnsiColors.RESET} ({resp.status_code}): {AnsiColors.HGRAY}{resp.text}{AnsiColors.RESET}" f"- publish {AnsiColors.HGREEN}succeeded{AnsiColors.RESET}: {AnsiColors.HGRAY}{resp.text}{AnsiColors.RESET}" ) except requests.exceptions.HTTPError as he: print( f" => publish {AnsiColors.HRED}failed{AnsiColors.RESET} ({he.response.status_code}): {AnsiColors.HGRAY}{he.response.text}{AnsiColors.RESET}", he, f"- publish {AnsiColors.HRED}failed{AnsiColors.RESET} (err {he.response.status_code}): {AnsiColors.HGRAY}{he.response.text}{AnsiColors.RESET}", ) if ( he.response.status_code == 404 Loading @@ -308,12 +283,15 @@ class Scanner: and allow_retry ): # try to create parent projects print( f" try to create full project path {AnsiColors.YELLOW}{'/'.join(target_project_path)}{AnsiColors.RESET}...", ) target_project_path = self.create_parent_projects(target_project_path) print("- create projects...") # replace last path part with project UUID # TODO: retrieve classifier from SBOM project_path_parts[-1] = "#" + self.get_or_create_project(project_path) # then retry self.do_publish(sbom_content, target_project_path, allow_retry=False) print("- retry publish...") self.do_publish( sbom_content, "/".join(project_path_parts), allow_retry=False ) else: raise Loading @@ -322,15 +300,22 @@ class Scanner: f"🗝 API key has permissions: {AnsiColors.BLUE}{', '.join(self.get_permissions())}{AnsiColors.RESET}" ) print() assert self.has_permission( DtPermission.BOM_UPLOAD ), "BOM_UPLOAD permission is mandatory to upload SBOM files to Dependency Track server" if not self.has_permission(DtPermission.BOM_UPLOAD): fail( "BOM_UPLOAD permission is mandatory to publish SBOM files to Dependency Track server" ) # scan for SBOM files for pattern in sbom_patterns: for file in glob.glob(pattern): self.publish(Path(file)) print() def fail(msg: str) -> None: print(f"{AnsiColors.HRED}ERROR{AnsiColors.RESET} {msg}") sys.exit(1) def run() -> None: # define command parser parser = argparse.ArgumentParser( Loading Loading @@ -365,17 +350,30 @@ def run() -> None: parser.add_argument( "sbom_patterns", nargs="*", default=os.getenv("DEPTRACK_SBOM_PATTERNS", "**/*.cyclonedx.json").split(" "), default=os.getenv( "DEPTRACK_SBOM_PATTERNS", "**/*.cyclonedx.json **/*.cyclonedx.xml" ).split(" "), help="SBOM file patterns to publish (supports glob patterns)", ) # parse command and args args = parser.parse_args() assert args.base_api_url, "Dependency Track server base API url is required (use --base-api-url CLI option or DEPTRACK_BASE_API_URL variable)" assert args.api_key, "Dependency Track API key (use --api_key CLI option or DEPTRACK_API_KEY variable)" assert args.project_path, "Dependency Track target project path is required (use --project-path CLI option or DEPTRACK_PROJECT_PATH variable)" # check required args if not args.base_api_url and not os.getenv("DEPTRACK_BASE_API_URL"): fail( "Dependency Track server base API url is required (use --base-api-url CLI option or DEPTRACK_BASE_API_URL variable)" ) if not args.api_key and not os.getenv("DEPTRACK_API_KEY"): fail( "Dependency Track API key is required (use --api-key CLI option or DEPTRACK_API_KEY variable)" ) if not args.project_path and not os.getenv("DEPTRACK_PROJECT_PATH"): fail( "Dependency Track target project path is required (use --project-path CLI option or DEPTRACK_PROJECT_PATH variable)" ) # print execution parameters print("Scanning SBOM files...") print( f"- base API url (--base-api-url) : {AnsiColors.CYAN}{args.base_api_url}{AnsiColors.RESET}" Loading @@ -391,6 +389,7 @@ def run() -> None: ) print() # execute the scan scanner = Scanner( base_api_url=args.base_api_url, api_key=args.api_key, Loading Loading
README.md +2 −2 Original line number Diff line number Diff line Loading @@ -22,7 +22,7 @@ This tool scans for SBOM files and publishes them to a Dependency Track server. positional arguments: sbom_patterns SBOM file patterns to publish (supports glob patterns) Default: '**/*.cyclonedx.json' Default: '**/*.cyclonedx.json **/*.cyclonedx.xml' options: -h, --help show this help message and exit Loading @@ -39,7 +39,7 @@ options: `sbom-scanner` accepts SBOM file patterns to publish (supports glob patterns) as multiple positional arguments. If none is specified, the program will look for SBOM files matching `**/*.cyclonedx.json`. If none is specified, the program will look for SBOM files matching `**/*.cyclonedx.json` and `**/*.cyclonedx.xml`. ### Options Loading
sbom_scanner/scan.py +73 −74 Original line number Diff line number Diff line Loading @@ -3,6 +3,7 @@ import glob import json import os import ssl import sys from enum import Enum from functools import cache from logging import Logger Loading Loading @@ -129,30 +130,21 @@ class Scanner: headers={"X-API-Key": self.api_key, "accept": "application/json"}, ).json()["permissions"] ] # resp = urlopen( # Request( # f"{self.base_api_url}/v1/team/self", # headers={"X-API-Key": self.api_key, "Accept": "application/json"}, # method="GET", # ), # context=INSECURE_SSL_CTX if self.insecure else None, # ) # resp_data = json.load(resp) # return [permission["name"] for permission in resp_data["permissions"]] def has_permission(self, perm: DtPermission) -> bool: return perm in self.get_permissions() # rewinds the given project path and creates a DT project for each non-UUID defined project # retunrs the project path with created UUIDS def create_parent_projects(self, project_path: list[str]) -> list[str]: if len(project_path) == 0: return project_path project_def = DtProjectDef(project_path[-1]) # retunrs the tail project UUID @cache def get_or_create_project(self, project_path: str, classifier="application") -> str: project_path_parts = project_path.split("/") project_def = DtProjectDef(project_path_parts[-1]) if project_def.is_uuid: # TODO: check if project exists? print(f" ... {AnsiColors.YELLOW}{'/'.join(project_path)}{AnsiColors.RESET} is UUID: assume exists") return project_path print( f"- {AnsiColors.YELLOW}{project_path}{AnsiColors.RESET} is UUID: assume exists..." ) return project_def.uuid # project is defined by name/version... resp = requests.get( Loading @@ -172,34 +164,33 @@ class Scanner: if matching_prj: # project already exists: replace name with found UUID print( f" ... {AnsiColors.YELLOW}{'/'.join(project_path)}{AnsiColors.RESET} found (by name/version): {matching_prj['uuid']}" f"- {AnsiColors.YELLOW}{project_path}{AnsiColors.RESET} found (by name/version): {matching_prj['uuid']}..." ) project_path[-1] = "#" + matching_prj["uuid"] return project_path return matching_prj["uuid"] # TODO: if project exists but not the version, we have to CLONE it # project does not exist: create it # TODO: smart classifier data = { "name": project_def.name, "version": project_def.version, "classifier": "APPLICATION", "classifier": classifier.upper(), "active": True, } # TODO: externalReferences # data["externalReferences"] = [{"type":"vcs","url":project_url}], if len(project_path) > 1: if len(project_path_parts) > 1: # project to create is not a root project: retrieve parent parent_def = DtProjectDef(project_path[-2]) parent_def = DtProjectDef(project_path_parts[-2]) if not parent_def.is_uuid: # create parent project resolved_path = self.create_parent_projects(project_path[:-1]) parent_uuid = self.get_or_create_project("/".join(project_path_parts[:-1])) # now parent def must be a UUID parent_def = DtProjectDef(resolved_path[-1]) parent_def = DtProjectDef("#" + parent_uuid) # add parent UUID to params data["parent"] = {"uuid": parent_def.uuid} print( f" ... {AnsiColors.YELLOW}{'/'.join(project_path)}{AnsiColors.RESET} doesn't exist: create with params {AnsiColors.HGRAY}{json.dumps(data)}{AnsiColors.RESET}..." f"- {AnsiColors.YELLOW}{project_path}{AnsiColors.RESET} not found: create with params {AnsiColors.HGRAY}{json.dumps(data)}{AnsiColors.RESET}..." ) resp = requests.put( f"{self.base_api_url}/v1/project", Loading @@ -214,15 +205,15 @@ class Scanner: resp.raise_for_status() except requests.exceptions.HTTPError as he: print( f" ... create {AnsiColors.YELLOW}{'/'.join(project_path)}{AnsiColors.RESET} {AnsiColors.HRED}failed{AnsiColors.RESET} ({he.response.status_code}): {AnsiColors.HGRAY}{he.response.text}{AnsiColors.RESET}", he, f"- create {AnsiColors.YELLOW}{project_path}{AnsiColors.RESET} {AnsiColors.HRED}failed{AnsiColors.RESET} (err {he.response.status_code}): {AnsiColors.HGRAY}{he.response.text}{AnsiColors.RESET}", ) raise # retrieve UUID from response and return created_uuid = resp.json()["uuid"] print(f" ... {AnsiColors.YELLOW}{'/'.join(project_path)}{AnsiColors.RESET} created: {created_uuid}") project_path[-1] = "#" + created_uuid return project_path print( f"- {AnsiColors.YELLOW}{project_path}{AnsiColors.RESET} {AnsiColors.HGREEN}successfully{AnsiColors.RESET} created: {created_uuid}" ) return created_uuid def publish(self, sbom_file: Path): print( Loading @@ -230,11 +221,10 @@ class Scanner: ) # compute the target project path sbom_prefix = sbom_file.name.split(".")[0] target_project_path = self.project_path.replace( "%{file_prefix}", sbom_prefix ).split("/") # sbom_extension = sbom_file.name.split(".")[-1] project_path = self.project_path.replace("%{file_prefix}", sbom_prefix) print( f" - target project: {AnsiColors.YELLOW}{'/'.join(target_project_path)}{AnsiColors.RESET}" f"- target project: {AnsiColors.YELLOW}{project_path}{AnsiColors.RESET}..." ) # load the SBOM content with open(sbom_file, "r") as reader: Loading @@ -242,15 +232,14 @@ class Scanner: # normalize SBOM (shorten IDs, ...) # TODO self.do_publish(sbom_content, target_project_path) self.do_publish(sbom_content, project_path) self.sbom_count += 1 def do_publish( self, sbom_content: str, target_project_path: list[str], allow_retry=True ): def do_publish(self, sbom_content: str, project_path: str, allow_retry=True): project_path_parts = project_path.split("/") # determine publish params params = {} project_def = DtProjectDef(target_project_path[-1]) project_def = DtProjectDef(project_path_parts[-1]) if project_def.is_uuid: # target project definition is a UUID: nothing more is required params["project"] = project_def.uuid Loading @@ -261,8 +250,8 @@ class Scanner: if self.has_permission(DtPermission.PROJECT_CREATION_UPLOAD): params["autoCreate"] = "true" if len(target_project_path) > 1: parent_def = DtProjectDef(target_project_path[-2]) if len(project_path_parts) > 1: parent_def = DtProjectDef(project_path_parts[-2]) if parent_def.is_uuid: params["parentUUID"] = parent_def.uuid else: Loading @@ -273,33 +262,19 @@ class Scanner: print( f"- publish params: {AnsiColors.HGRAY}{json.dumps(params)}{AnsiColors.RESET}..." ) params["bom"] = sbom_content # with urllib # resp = urlopen( # Request( # f"{self.base_api_url}/v1/bom", # headers={"X-API-Key": self.api_key, "Content-Type": "application/json", "Accept": "application/json"}, # method="PUT", # data=json.dumps(params).encode(), # ), # context=INSECURE_SSL_CTX if self.insecure else None, # ) # resp_data = json.load(resp) # print(f" => publish response: {resp.getCode()} - {resp_data}") resp = requests.post( f"{self.base_api_url}/v1/bom", headers={"X-API-Key": self.api_key, "accept": "application/json"}, files=params, files={"bom": sbom_content, **params}, ) try: resp.raise_for_status() print( f" => publish {AnsiColors.HGREEN}succeeded{AnsiColors.RESET} ({resp.status_code}): {AnsiColors.HGRAY}{resp.text}{AnsiColors.RESET}" f"- publish {AnsiColors.HGREEN}succeeded{AnsiColors.RESET}: {AnsiColors.HGRAY}{resp.text}{AnsiColors.RESET}" ) except requests.exceptions.HTTPError as he: print( f" => publish {AnsiColors.HRED}failed{AnsiColors.RESET} ({he.response.status_code}): {AnsiColors.HGRAY}{he.response.text}{AnsiColors.RESET}", he, f"- publish {AnsiColors.HRED}failed{AnsiColors.RESET} (err {he.response.status_code}): {AnsiColors.HGRAY}{he.response.text}{AnsiColors.RESET}", ) if ( he.response.status_code == 404 Loading @@ -308,12 +283,15 @@ class Scanner: and allow_retry ): # try to create parent projects print( f" try to create full project path {AnsiColors.YELLOW}{'/'.join(target_project_path)}{AnsiColors.RESET}...", ) target_project_path = self.create_parent_projects(target_project_path) print("- create projects...") # replace last path part with project UUID # TODO: retrieve classifier from SBOM project_path_parts[-1] = "#" + self.get_or_create_project(project_path) # then retry self.do_publish(sbom_content, target_project_path, allow_retry=False) print("- retry publish...") self.do_publish( sbom_content, "/".join(project_path_parts), allow_retry=False ) else: raise Loading @@ -322,15 +300,22 @@ class Scanner: f"🗝 API key has permissions: {AnsiColors.BLUE}{', '.join(self.get_permissions())}{AnsiColors.RESET}" ) print() assert self.has_permission( DtPermission.BOM_UPLOAD ), "BOM_UPLOAD permission is mandatory to upload SBOM files to Dependency Track server" if not self.has_permission(DtPermission.BOM_UPLOAD): fail( "BOM_UPLOAD permission is mandatory to publish SBOM files to Dependency Track server" ) # scan for SBOM files for pattern in sbom_patterns: for file in glob.glob(pattern): self.publish(Path(file)) print() def fail(msg: str) -> None: print(f"{AnsiColors.HRED}ERROR{AnsiColors.RESET} {msg}") sys.exit(1) def run() -> None: # define command parser parser = argparse.ArgumentParser( Loading Loading @@ -365,17 +350,30 @@ def run() -> None: parser.add_argument( "sbom_patterns", nargs="*", default=os.getenv("DEPTRACK_SBOM_PATTERNS", "**/*.cyclonedx.json").split(" "), default=os.getenv( "DEPTRACK_SBOM_PATTERNS", "**/*.cyclonedx.json **/*.cyclonedx.xml" ).split(" "), help="SBOM file patterns to publish (supports glob patterns)", ) # parse command and args args = parser.parse_args() assert args.base_api_url, "Dependency Track server base API url is required (use --base-api-url CLI option or DEPTRACK_BASE_API_URL variable)" assert args.api_key, "Dependency Track API key (use --api_key CLI option or DEPTRACK_API_KEY variable)" assert args.project_path, "Dependency Track target project path is required (use --project-path CLI option or DEPTRACK_PROJECT_PATH variable)" # check required args if not args.base_api_url and not os.getenv("DEPTRACK_BASE_API_URL"): fail( "Dependency Track server base API url is required (use --base-api-url CLI option or DEPTRACK_BASE_API_URL variable)" ) if not args.api_key and not os.getenv("DEPTRACK_API_KEY"): fail( "Dependency Track API key is required (use --api-key CLI option or DEPTRACK_API_KEY variable)" ) if not args.project_path and not os.getenv("DEPTRACK_PROJECT_PATH"): fail( "Dependency Track target project path is required (use --project-path CLI option or DEPTRACK_PROJECT_PATH variable)" ) # print execution parameters print("Scanning SBOM files...") print( f"- base API url (--base-api-url) : {AnsiColors.CYAN}{args.base_api_url}{AnsiColors.RESET}" Loading @@ -391,6 +389,7 @@ def run() -> None: ) print() # execute the scan scanner = Scanner( base_api_url=args.base_api_url, api_key=args.api_key, Loading