Commit 0bb3fd97 authored by Clement Bois's avatar Clement Bois
Browse files

Merge branch 'feat/vex-upload' into 'main'

feat: add VEX file uploading feature

See merge request to-be-continuous/tools/dt-sbom-scanner!68
parents a4b94c5a 6f2cf04f
Loading
Loading
Loading
Loading
+17 −2
Original line number Diff line number Diff line
@@ -42,7 +42,8 @@ docker run \

```bash
usage: sbom-scanner [-h] [-u BASE_API_URL] [-k API_KEY] [-i] [-p PROJECT_PATH] [-s PATH_SEPARATOR] [-t TAGS] [--parent-collection-logic {NONE,ALL,TAG,LATEST}]
 [--parent-collection-logic-tag PARENT_COLLECTION_LOGIC_TAG] [-m] [-o MERGE_OUTPUT] [-l PURL_MAX_LEN] [-S] [-R RISK_SCORE_THRESHOLD] [sbom_patterns ...]
                    [--parent-collection-logic-tag PARENT_COLLECTION_LOGIC_TAG] [-m] [-o MERGE_OUTPUT] [-l PURL_MAX_LEN] [-U] [-V MERGED_VEX_FILE] [-S] [-R RISK_SCORE_THRESHOLD]
                    [sbom_patterns ...]

This tool scans for SBOM files and publishes them to a Dependency Track server.

@@ -77,6 +78,12 @@ SBOM management:
  -l PURL_MAX_LEN, --purl-max-len PURL_MAX_LEN
                        PURLs max length (-1: auto, 0: no trim, >0: trim to size - default: -1)

VEX:
  -U, --upload-vex      Upload VEX file after SBOM analysis (requires VULNERABILITY_ANALYSIS permission). The VEX file(s) are resolved based on the sbom pattern(s). The first part of the SBOM file name is used
                        to match it with a VEX file (e.g. if there is an SBOM file 'example.cyclonedx.json', the corresponding VEX file name must be 'example.vex.json')
  -V, --merged-vex-file MERGED_VEX_FILE
                        The VEX file to upload if multiple SBOMS are merged (--merge). Can only be used with --upload-vex and --merge.

Miscellaneous:
  -S, --show-findings   Wait for analysis and display found vulnerabilities
  -R RISK_SCORE_THRESHOLD, --risk-score-threshold RISK_SCORE_THRESHOLD
@@ -118,6 +125,13 @@ If none is specified, the program will look for SBOM files matching `**/*.cyclon
| `-o` / `--merge-output`         | `$DEPTRACK_MERGE_OUTPUT`                | Output merged SBOM file (only used with merge enabled) - for debugging purpose  |
| `-l` / `--purl-max-len`         | `$DEPTRACK_PURL_MAX_LEN`                | PURLs max length (`-1`: auto, `0`: no trim, `>0`: trim to size - default: `-1`) |

#### VEX

| CLI option                 | Env. Variable               | Description                                                                                                                                                                                                                                                                                                                                |
| -------------------------- | --------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `-U` / `--upload-vex`      | `$DEPTRACK_UPLOAD_VEX`      | Upload VEX file after SBOM analysis (requires VULNERABILITY_ANALYSIS permission). The VEX file(s) are resolved based on the sbom pattern(s). The first part of the SBOM file name is used to match it with a VEX file (e.g. if there is an SBOM file 'example.cyclonedx.json', the corresponding VEX file name must be 'example.vex.json') |
| `-V` / `--merged-vex-file` | `$DEPTRACK_MERGED_VEX_FILE` | The VEX file to upload if multiple SBOMS are merged (--merge). Can only be used with --upload-vex and --merge.                                                                                                                                                                                                                             |

#### Miscellaneous

| CLI option                      | Env. Variable                           | Description                                                                    |
@@ -133,6 +147,7 @@ If none is specified, the program will look for SBOM files matching `**/*.cyclon
  Granting those permissions without enabling [Portfolio ACLs](https://github.com/DependencyTrack/dependency-track/issues/1127) is not recommended in the general case as give a read access to all projects.
- The extra `VIEW_PORTFOLIO` and `PORTFOLIO_MANAGEMENT` permissions are required if you want to automatically create one or several project ancestors prior to uploading the SBOM files.<br/>
  Granting those permissions is not recommended in the general case as they virtually give administration rights to the API Key owner.
- The extra `VULNERABILITY_ANALYSIS` permission is required if you want to upload VEX files after SBOM analysis.

## Project Path

@@ -193,7 +208,7 @@ A collection project won't be able to host a SBOM, however it will display all v
| `LATEST` | `AGGREGATE_LATEST_VERSION_CHILDREN`  | Parent project will collect metrics from direct children flagged as latest |
| `NONE`   | `NONE`                               | Parent project will behave like a normal project |

:information_source: since `sbom-scanner` do not update existing projects, collection logic will only be configured on parent project created by `sbom-scanner`
:information_source: since `sbom-scanner` does not update existing projects, collection logic will only be configured on the parent project created by `sbom-scanner`

See [Dependency Track documentation about collection project](https://docs.dependencytrack.org/usage/collection-projects/) for more details.

+212 −214

File changed.

Preview size limit exceeded, changes collapsed.

+178 −82
Original line number Diff line number Diff line
@@ -29,6 +29,7 @@ INSECURE_SSL_CTX.verify_mode = ssl.CERT_NONE

MIME_APPLICATION_JSON = "application/json"

IS_STR_TRUE = ["true", "yes", "1"]

@dataclass
class DtSeverity:
@@ -130,6 +131,18 @@ class DtProjectDef:
            return None
        return self.definition.split("@")[1] if "@" in self.definition else None

    @property
    def params(self) -> dict[str, str]:
        params = {}
        if self.is_uuid:
            # target project definition is a UUID: nothing more is required
            params["project"] = self.uuid
        else:
            # target project definition is a project name: assume exists or set autoCreate with parent if permission PROJECT_CREATION_UPLOAD
            params["projectName"] = self.name
            params["projectVersion"] = self.version
        return params


class Version:
    def __init__(self, version_str):
@@ -231,6 +244,29 @@ class Version:
            return 0, alpha


class ApiClient:
    def __init__(self, base_api_url: str, api_key: str, verify_ssl: bool):
        self.base_api_url = base_api_url
        self.session = requests.Session()
        self.session.headers.update({
            "X-API-Key": api_key,
            "accept": MIME_APPLICATION_JSON,
        })
        self.session.verify = verify_ssl

    def get(self, path, **kwargs):
        url = f"{self.base_api_url}{path}"
        return self.session.get(url, **kwargs)

    def post(self, path, **kwargs):
        url = f"{self.base_api_url}{path}"
        return self.session.post(url, **kwargs)

    def put(self, path, **kwargs):
        url = f"{self.base_api_url}{path}"
        return self.session.put(url, **kwargs)


class Scanner:
    def __init__(
        self,
@@ -247,16 +283,16 @@ class Scanner:
        tags: str = "",
        parent_collection_logic: str = CollectionLogic.ALL.name,
        parent_collection_logic_tag: str = "",
        upload_vex: bool = False,
        merged_vex_file = None,
        **_: None,
    ):
        self.base_api_url = base_api_url
        self.api_key = api_key
        self.api_client = ApiClient(base_api_url, api_key, verify_ssl)
        self.project_path = project_path
        self.path_separator = path_separator
        self._purl_max_len = purl_max_len
        self.merge = merge
        self.merge_output = merge_output
        self.verify_ssl = verify_ssl
        self.show_findings = show_findings
        self.risk_score_threshold = risk_score_threshold
        self.tags = list(filter(None, map(str.strip, tags.split(",")))) if tags else []
@@ -264,17 +300,15 @@ class Scanner:
        self.parent_collection_logic_tag = parent_collection_logic_tag
        self.sbom_count = 0
        self.sbom_scan_failed = 0
        self.upload_vex = upload_vex
        self.merged_vex_file = merged_vex_file

    @property
    @cache
    def dt_version(self) -> Version:
        """Determines the DT server version."""
        return Version(
            requests.get(
                f"{self.base_api_url}/version",
                headers={"accept": MIME_APPLICATION_JSON},
                verify=self.verify_ssl,
            ).json()["version"]
            self.api_client.get("/version").json()["version"]
        )

    @property
@@ -312,11 +346,7 @@ class Scanner:
    def get_permissions(self) -> list[DtPermission]:
        return [
            permission["name"]
            for permission in requests.get(
                f"{self.base_api_url}/v1/team/self",
                headers={"X-API-Key": self.api_key, "accept": MIME_APPLICATION_JSON},
                verify=self.verify_ssl,
            ).json()["permissions"]
            for permission in self.api_client.get("/v1/team/self").json()["permissions"]
        ]

    def has_permission(self, perm: DtPermission) -> bool:
@@ -337,11 +367,9 @@ class Scanner:
            return project_def.uuid

        # project is defined by name/version...
        resp = requests.get(
            f"{self.base_api_url}/v1/project",
            headers={"X-API-Key": self.api_key, "accept": MIME_APPLICATION_JSON},
        resp = self.api_client.get(
            "/v1/project",
            params={"name": project_def.name},
            verify=self.verify_ssl,
        )
        resp.raise_for_status()
        # find project with matching name/version
@@ -373,11 +401,9 @@ class Scanner:
                f"- {AnsiColors.YELLOW}{project_path}{AnsiColors.RESET} found sibling (version: {name_match.get('version')}): {name_match['uuid']}..."
            )
            # now create a clone of the project
            resp = requests.put(
                f"{self.base_api_url}/v1/project/clone",
            resp = self.api_client.put(
                "/v1/project/clone",
                headers={
                    "X-API-Key": self.api_key,
                    "accept": MIME_APPLICATION_JSON,
                    "content-type": MIME_APPLICATION_JSON,
                },
                json={
@@ -390,19 +416,16 @@ class Scanner:
                    "includeAuditHistory": True,
                    "includeACL": True,
                },
                verify=self.verify_ssl,
            )
            try:
                resp.raise_for_status()
                # TODO: clone doesn't return UUID :(
                resp = requests.get(
                    f"{self.base_api_url}/v1/project/lookup",
                resp = self.api_client.get(
                    "/v1/project/lookup",
                    headers={
                        "X-API-Key": self.api_key,
                        "accept": MIME_APPLICATION_JSON,
                    },
                    params={"name": project_def.name, "version": project_def.version},
                    verify=self.verify_ssl,
                )
                resp.raise_for_status()
                # retrieve UUID from response and return
@@ -458,15 +481,12 @@ class Scanner:
        print(
            f"- {AnsiColors.YELLOW}{project_path}{AnsiColors.RESET} not found: create with params {AnsiColors.HGRAY}{json.dumps(data)}{AnsiColors.RESET}..."
        )
        resp = requests.put(
            f"{self.base_api_url}/v1/project",
        resp = self.api_client.put(
            "/v1/project",
            headers={
                "X-API-Key": self.api_key,
                "accept": MIME_APPLICATION_JSON,
                "content-type": MIME_APPLICATION_JSON,
            },
            json=data,
            verify=self.verify_ssl,
        )
        try:
            resp.raise_for_status()
@@ -482,7 +502,7 @@ class Scanner:
            )
            raise

    def publish(self, sbom: Bom, file_prefix: str):
    def publish(self, sbom: Bom, file_prefix: str, vex_file_path: Path):
        sbom_type = None
        sbom_name = None
        sbom_version = None
@@ -513,23 +533,16 @@ class Scanner:
            sbom_utils.trim_purls(sbom, self.purl_max_len)

        self.do_publish(
            sbom_utils.to_json(sbom, self.cdx_schema_version), project_path, sbom_type
            sbom_utils.to_json(sbom, self.cdx_schema_version), project_path, sbom_type, vex_file_path
        )

    def do_publish(
        self, sbom_json: str, project_path: str, sbom_type: str, allow_retry=True
        self, sbom_json: str, project_path: str, sbom_type: str, vex_file_path: Path, allow_retry=True
    ):
        project_path_parts = project_path.split(self.path_separator)
        # determine publish params
        params = {}
        project_def = DtProjectDef(project_path_parts[-1])
        if project_def.is_uuid:
            # target project definition is a UUID: nothing more is required
            params["project"] = project_def.uuid
        else:
            # target project definition is a project name: assume exists or set autoCreate with parent if permission PROJECT_CREATION_UPLOAD
            params["projectName"] = project_def.name
            params["projectVersion"] = project_def.version
        params = project_def.params

        if self.has_permission(DtPermission.PROJECT_CREATION_UPLOAD):
            params["autoCreate"] = "true"
@@ -547,12 +560,10 @@ class Scanner:
        print(
            f"- publish params: {AnsiColors.HGRAY}{json.dumps(params)}{AnsiColors.RESET}..."
        )
        resp = requests.post(
            f"{self.base_api_url}/v1/bom",
            headers={"X-API-Key": self.api_key, "accept": MIME_APPLICATION_JSON},
        resp = self.api_client.post(
            "/v1/bom",
            files={"bom": sbom_json},
            data=params,
            verify=self.verify_ssl,
        )
        try:
            resp.raise_for_status()
@@ -582,6 +593,7 @@ class Scanner:
                    sbom_json,
                    self.path_separator.join(project_path_parts),
                    sbom_type,
                    vex_file_path,
                    allow_retry=False,
                )
                # to prevent do_scan one more time (must have been done in the retried do_publish())
@@ -589,10 +601,44 @@ class Scanner:
            else:
                raise

        event_id = resp.json().get("token")

        # import VEX file
        if self.upload_vex:
            event_id = self.do_vex_publish(project_def, vex_file_path, event_id)

        if self.need_findings:
            event_id = resp.json()["token"]
            self.do_scan(project_def, event_id)

    def do_vex_publish(self, project_def: DtProjectDef, vex_file_path: Path,  event_id: str):
        self.wait_for_event_processing(event_id)

        if not vex_file_path.exists():
            print(
                f"- VEX file {AnsiColors.YELLOW}not found, skipping upload{AnsiColors.RESET}: {AnsiColors.HGRAY}{vex_file_path}{AnsiColors.RESET}"
            )
            return event_id

        with open(vex_file_path, "r") as vex_file:
            params = project_def.params
            resp = self.api_client.post(
                "/v1/vex",
                files={"vex": vex_file},
                data=params,
            )
            try:
                resp.raise_for_status()
                print(
                    f"- VEX import {AnsiColors.HGREEN}succeeded{AnsiColors.RESET}: {AnsiColors.HGRAY}{resp.text}{AnsiColors.RESET}"
                )
            except requests.exceptions.HTTPError as he:
                print(
                    f"- VEX import {AnsiColors.HRED}failed{AnsiColors.RESET} (err {he.response.status_code}): {AnsiColors.HGRAY}{he.response.text}{AnsiColors.RESET}",
                )
                raise

            return resp.json().get("token")

    def do_scan(self, project_def: DtProjectDef, event_id: str):
        print(f"- scan: {AnsiColors.HGRAY}{event_id}{AnsiColors.RESET}...")
        if project_def.is_uuid:
@@ -602,20 +648,16 @@ class Scanner:
            params["name"] = project_def.name
            if project_def.version:
                params["version"] = project_def.version
            resp = requests.get(
                f"{self.base_api_url}/v1/project/lookup",
                headers={"X-API-Key": self.api_key, "accept": MIME_APPLICATION_JSON},
            resp = self.api_client.get(
                "/v1/project/lookup",
                params=params,
                verify=self.verify_ssl,
            )
            project_id = resp.json().get("uuid")

        self.wait_for_event_processing(event_id)
        # MAYBE: get SBOM with VEX curl -sSf f"{self.base_api_url}/v1/bom/cyclonedx/project/{project_id}?variant=withVulnerabilities"
        resp = requests.get(
            f"{self.base_api_url}/v1/finding/project/{project_id}",
            headers={"X-API-Key": self.api_key, "accept": MIME_APPLICATION_JSON},
            verify=self.verify_ssl,
        resp = self.api_client.get(
            f"/v1/finding/project/{project_id}",
        )
        resp.raise_for_status()
        risk_score = 0
@@ -649,12 +691,10 @@ class Scanner:
    def wait_for_event_processing(self, event_id: str):
        for n in range(8):  # ~5 minutes
            sleep(2**n)
            resp = requests.get(
                f"{self.base_api_url}/v1/{self.event_token_path}/{event_id}",
                headers={"X-API-Key": self.api_key, "accept": MIME_APPLICATION_JSON},
                verify=self.verify_ssl,
            resp = self.api_client.get(
                f"/v1/{self.event_token_path}/{event_id}",
            )
            if resp.json().get("processing", False):
            if not resp.json().get("processing", False):
                break

    def scan(self, sbom_patterns: list[str]):
@@ -690,6 +730,10 @@ class Scanner:
                fail(
                    "VIEW_PORTFOLIO permission is mandatory to show finding or compute risk score after SBOM analysis"
                )
        if self.upload_vex and not self.has_permission(DtPermission.VULNERABILITY_ANALYSIS):
            fail(
              "VULNERABILITY_ANALYSIS permission is mandatory to import VEX files"
            )

        # scan for SBOM files
        sboms = []
@@ -698,13 +742,21 @@ class Scanner:
                print(
                    f"{AnsiColors.BOLD}📄 SBOM: {AnsiColors.BLUE}{file}{AnsiColors.RESET}"
                )
                # load the SBOM content
                file_path = Path(file)
                sbom = sbom_utils.load_bom(file_path)
                # load the SBOM and VEX content
                sbom_file_path = Path(file)
                sbom_file_prefix = sbom_file_path.name.split(".")[0]
                vex_file_path = sbom_file_path.with_name(f"{sbom_file_prefix}.vex.json")

                if self.upload_vex and not vex_file_path.exists():
                    fail(
                        f"- VEX file {AnsiColors.HRED}not found{AnsiColors.RESET}: {AnsiColors.HGRAY}{vex_file_path}{AnsiColors.RESET}"
                    )

                sbom = sbom_utils.load_bom(sbom_file_path)
                if self.merge:
                    sboms.append(sbom)
                else:
                    self.publish(sbom, file_path.name.split(".")[0])
                    self.publish(sbom, sbom_file_prefix, vex_file_path)

                print()
                self.sbom_count += 1
@@ -741,7 +793,13 @@ class Scanner:
                sbom_utils.save_bom(
                    merged_sbom, Path(self.merge_output), self.cdx_schema_version
                )
            self.publish(merged_sbom, "merged")
            vex_file_path = Path(self.merged_vex_file) if self.merged_vex_file else None
            if self.upload_vex and not vex_file_path.exists():
                fail(
                    f"- VEX file {AnsiColors.HRED}not found{AnsiColors.RESET}: {AnsiColors.HGRAY}{vex_file_path}{AnsiColors.RESET}"
                )

            self.publish(merged_sbom, "merged", vex_file_path)


def fail(msg: str) -> None:
@@ -774,7 +832,7 @@ def run() -> None:
        "-i",
        "--insecure",
        action="store_true",
        default=os.getenv("DEPTRACK_INSECURE") in ["true", "yes", "1"],
        default=os.getenv("DEPTRACK_INSECURE") in IS_STR_TRUE,
        help="Skip SSL verification",
    )

@@ -820,7 +878,7 @@ def run() -> None:
        "-m",
        "--merge",
        action="store_true",
        default=os.getenv("DEPTRACK_MERGE") in ["true", "yes", "1"],
        default=os.getenv("DEPTRACK_MERGE") in IS_STR_TRUE,
        help="Merge all SBOM files into one",
    )
    sbom_management_group.add_argument(
@@ -838,12 +896,28 @@ def run() -> None:
        help="PURLs max length (-1: auto, 0: no trim, >0: trim to size - default: -1)",
    )

    vex_group = parser.add_argument_group("VEX")
    vex_group.add_argument(
        "-U",
        "--upload-vex",
        action="store_true",
        default=os.getenv("DEPTRACK_UPLOAD_VEX") in IS_STR_TRUE,
        help="Upload VEX file after SBOM analysis (requires VULNERABILITY_ANALYSIS permission). The VEX file(s) are resolved based on the sbom pattern(s). The first part of the SBOM file name is used to match it with a VEX file (e.g. if there is an SBOM file 'example.cyclonedx.json', the corresponding VEX file name must be 'example.vex.json')",
    )
    vex_group.add_argument(
        "-V",
        "--merged-vex-file",
        type=str,
        default=os.getenv("DEPTRACK_MERGED_VEX_FILE"),
        help="The VEX file to upload if multiple SBOMS are merged (--merge). Can only be used with --upload-vex and --merge.",
    )

    misc_group = parser.add_argument_group("Miscellaneous")
    misc_group.add_argument(
        "-S",
        "--show-findings",
        action="store_true",
        default=os.getenv("DEPTRACK_SHOW_FINDINGS") in ["true", "yes", "1"],
        default=os.getenv("DEPTRACK_SHOW_FINDINGS") in IS_STR_TRUE,
        help="Wait for analysis and display found vulnerabilities",
    )
    misc_group.add_argument(
@@ -886,6 +960,22 @@ def run() -> None:
        fail(
            f"You need to specify a tag with --parent-collection-logic-tag (or DEPTRACK_PARENT_COLLECTION_LOGIC_TAG env var) if parent collection logic has been set to {CollectionLogic.TAG.name}"
        )
    if (
        args.merge
        and args.upload_vex
        and not args.merged_vex_file
    ):
        fail(
            "You need to specify a VEX file with --merged-vex-file (or DEPTRACK_MERGED_VEX_FILE env var) if you want to upload a VEX file and are merging SBOM files (--merge)"
        )
    if (
        not args.merge
        and args.upload_vex
        and args.merged_vex_file
    ):
        fail(
            "You cannot specify a VEX file with --merged-vex-file (or DEPTRACK_MERGED_VEX_FILE env var) if you are NOT merging SBOM files (--merge is not set)"
        )

    # print execution parameters
    print("Scanning SBOM files...")
@@ -927,6 +1017,12 @@ def run() -> None:
    print(
        f"- insecure             (--insecure): {AnsiColors.CYAN}{args.insecure}{AnsiColors.RESET}"
    )
    print(
        f"- Upload VEX         (--upload-vex): {AnsiColors.CYAN}{args.upload_vex}{AnsiColors.RESET}"
    )
    print(
        f"- VEX file path for merged SBOM (--merged-vex-file): {AnsiColors.CYAN}{args.merged_vex_file}{AnsiColors.RESET}"
    )
    print(
        f"- SBOM file pattern                : {AnsiColors.CYAN}{', '.join(args.sbom_patterns)}{AnsiColors.RESET}"
    )