Skip to content

Commit efc42ce

Browse files
authored
CM-24498 - Add SBOM reports (#164)
CM-26214, CM-26215, CM-26216, CM-27640, CM-26254 - Add SBOM reports
1 parent b9bbd66 commit efc42ce

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1315
-554
lines changed

cycode/cli/code_scanner.py

+57-405
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import click
2+
3+
from cycode.cli.commands.report.sbom.sbom_command import sbom_command
4+
from cycode.cli.utils.get_api_client import get_report_cycode_client
5+
from cycode.cli.utils.progress_bar import SBOM_REPORT_PROGRESS_BAR_SECTIONS, get_progress_bar
6+
7+
8+
@click.group(
9+
commands={
10+
'sbom': sbom_command,
11+
},
12+
short_help='Generate report. You`ll need to specify which report type to perform.',
13+
)
14+
@click.pass_context
15+
def report_command(
16+
context: click.Context,
17+
) -> int:
18+
"""Generate report."""
19+
20+
context.obj['client'] = get_report_cycode_client(hide_response_log=False) # TODO disable log
21+
context.obj['progress_bar'] = get_progress_bar(hidden=False, sections=SBOM_REPORT_PROGRESS_BAR_SECTIONS)
22+
23+
return 1
+94
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import pathlib
2+
import time
3+
from platform import platform
4+
from typing import TYPE_CHECKING, Optional
5+
6+
from cycode.cli import consts
7+
from cycode.cli.commands.report.sbom.sbom_report_file import SbomReportFile
8+
from cycode.cli.config import configuration_manager
9+
from cycode.cli.exceptions.custom_exceptions import ReportAsyncError
10+
from cycode.cli.utils.progress_bar import SbomReportProgressBarSection
11+
from cycode.cyclient import logger
12+
from cycode.cyclient.models import ReportExecutionSchema
13+
14+
if TYPE_CHECKING:
15+
from cycode.cli.utils.progress_bar import BaseProgressBar
16+
from cycode.cyclient.report_client import ReportClient
17+
18+
19+
def _poll_report_execution_until_completed(
20+
progress_bar: 'BaseProgressBar',
21+
client: 'ReportClient',
22+
report_execution_id: int,
23+
polling_timeout: Optional[int] = None,
24+
) -> ReportExecutionSchema:
25+
if polling_timeout is None:
26+
polling_timeout = configuration_manager.get_report_polling_timeout_in_seconds()
27+
28+
end_polling_time = time.time() + polling_timeout
29+
while time.time() < end_polling_time:
30+
report_execution = client.get_report_execution(report_execution_id)
31+
report_label = report_execution.error_message or report_execution.status_message
32+
33+
progress_bar.update_label(report_label)
34+
35+
if report_execution.status == consts.REPORT_STATUS_COMPLETED:
36+
return report_execution
37+
38+
if report_execution.status == consts.REPORT_STATUS_ERROR:
39+
raise ReportAsyncError(f'Error occurred while trying to generate report: {report_label}')
40+
41+
time.sleep(consts.REPORT_POLLING_WAIT_INTERVAL_IN_SECONDS)
42+
43+
raise ReportAsyncError(f'Timeout exceeded while waiting for report to complete. Timeout: {polling_timeout} sec.')
44+
45+
46+
def send_report_feedback(
47+
client: 'ReportClient',
48+
start_scan_time: float,
49+
report_type: str,
50+
report_command_type: str,
51+
request_report_parameters: dict,
52+
report_execution_id: int,
53+
error_message: Optional[str] = None,
54+
request_zip_file_size: Optional[int] = None,
55+
**kwargs,
56+
) -> None:
57+
try:
58+
request_report_parameters.update(kwargs)
59+
60+
end_scan_time = time.time()
61+
scan_status = {
62+
'report_type': report_type,
63+
'report_command_type': report_command_type,
64+
'request_report_parameters': request_report_parameters,
65+
'operation_system': platform(),
66+
'error_message': error_message,
67+
'execution_time': int(end_scan_time - start_scan_time),
68+
'request_zip_file_size': request_zip_file_size,
69+
}
70+
71+
client.report_status(report_execution_id, scan_status)
72+
except Exception as e:
73+
logger.debug(f'Failed to send report feedback: {e}')
74+
75+
76+
def create_sbom_report(
77+
progress_bar: 'BaseProgressBar',
78+
client: 'ReportClient',
79+
report_execution_id: int,
80+
output_file: Optional[pathlib.Path],
81+
output_format: str,
82+
) -> None:
83+
report_execution = _poll_report_execution_until_completed(progress_bar, client, report_execution_id)
84+
85+
progress_bar.set_section_length(SbomReportProgressBarSection.GENERATION)
86+
87+
report_path = report_execution.storage_details.path
88+
report_content = client.get_file_content(report_path)
89+
90+
progress_bar.set_section_length(SbomReportProgressBarSection.RECEIVE_REPORT)
91+
progress_bar.stop()
92+
93+
sbom_report = SbomReportFile(report_path, output_format, output_file)
94+
sbom_report.write(report_content)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import traceback
2+
from typing import Optional
3+
4+
import click
5+
6+
from cycode.cli.exceptions import custom_exceptions
7+
from cycode.cli.models import CliError, CliErrors
8+
from cycode.cli.printers import ConsolePrinter
9+
10+
11+
def handle_report_exception(context: click.Context, err: Exception) -> Optional[CliError]:
12+
if context.obj['verbose']:
13+
click.secho(f'Error: {traceback.format_exc()}', fg='red')
14+
15+
errors: CliErrors = {
16+
custom_exceptions.NetworkError: CliError(
17+
code='cycode_error',
18+
message='Cycode was unable to complete this report. '
19+
'Please try again by executing the `cycode report` command',
20+
),
21+
custom_exceptions.ScanAsyncError: CliError(
22+
code='report_error',
23+
message='Cycode was unable to complete this report. '
24+
'Please try again by executing the `cycode report` command',
25+
),
26+
custom_exceptions.ReportAsyncError: CliError(
27+
code='report_error',
28+
message='Cycode was unable to complete this report. '
29+
'Please try again by executing the `cycode report` command',
30+
),
31+
custom_exceptions.HttpUnauthorizedError: CliError(
32+
code='auth_error',
33+
message='Unable to authenticate to Cycode, your token is either invalid or has expired. '
34+
'Please re-generate your token and reconfigure it by running the `cycode configure` command',
35+
),
36+
}
37+
38+
if type(err) in errors:
39+
error = errors[type(err)]
40+
41+
ConsolePrinter(context).print_error(error)
42+
return None
43+
44+
if isinstance(err, click.ClickException):
45+
raise err
46+
47+
raise click.ClickException(str(err))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import pathlib
2+
from typing import Optional
3+
4+
import click
5+
6+
from cycode.cli.commands.report.sbom.sbom_path_command import sbom_path_command
7+
from cycode.cli.commands.report.sbom.sbom_repository_url_command import sbom_repository_url_command
8+
from cycode.cli.config import config
9+
from cycode.cyclient.report_client import ReportParameters
10+
11+
12+
@click.group(
13+
commands={
14+
'path': sbom_path_command,
15+
'repository_url': sbom_repository_url_command,
16+
},
17+
short_help='Generate SBOM report for remote repository by url or local directory by path.',
18+
)
19+
@click.option(
20+
'--format',
21+
'-f',
22+
help='SBOM format.',
23+
type=click.Choice(config['scans']['supported_sbom_formats']),
24+
required=True,
25+
)
26+
@click.option(
27+
'--output-format',
28+
'-o',
29+
default='json',
30+
help='Specify the output file format (the default is json).',
31+
type=click.Choice(['json']),
32+
required=False,
33+
)
34+
@click.option(
35+
'--output-file',
36+
help='Output file (the default is autogenerated filename saved to the current directory).',
37+
default=None,
38+
type=click.Path(resolve_path=True, writable=True, path_type=pathlib.Path),
39+
required=False,
40+
)
41+
@click.option(
42+
'--include-vulnerabilities',
43+
is_flag=True,
44+
default=False,
45+
help='Include vulnerabilities.',
46+
type=bool,
47+
required=False,
48+
)
49+
@click.option(
50+
'--include-dev-dependencies',
51+
is_flag=True,
52+
default=False,
53+
help='Include dev dependencies.',
54+
type=bool,
55+
required=False,
56+
)
57+
@click.pass_context
58+
def sbom_command(
59+
context: click.Context,
60+
format: str,
61+
output_format: Optional[str],
62+
output_file: Optional[pathlib.Path],
63+
include_vulnerabilities: bool,
64+
include_dev_dependencies: bool,
65+
) -> int:
66+
"""Generate SBOM report."""
67+
sbom_format_parts = format.split('-')
68+
if len(sbom_format_parts) != 2:
69+
raise click.ClickException('Invalid SBOM format.')
70+
71+
sbom_format, sbom_format_version = sbom_format_parts
72+
73+
report_parameters = ReportParameters(
74+
entity_type='SbomCli',
75+
sbom_report_type=sbom_format,
76+
sbom_version=sbom_format_version,
77+
output_format=output_format,
78+
include_vulnerabilities=include_vulnerabilities,
79+
include_dev_dependencies=include_dev_dependencies,
80+
)
81+
context.obj['report_parameters'] = report_parameters
82+
context.obj['output_file'] = output_file
83+
84+
return 1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import time
2+
3+
import click
4+
5+
from cycode.cli import consts
6+
from cycode.cli.commands.report.sbom.common import create_sbom_report, send_report_feedback
7+
from cycode.cli.commands.report.sbom.handle_errors import handle_report_exception
8+
from cycode.cli.files_collector.path_documents import get_relevant_document
9+
from cycode.cli.files_collector.sca.sca_code_scanner import perform_pre_scan_documents_actions
10+
from cycode.cli.files_collector.zip_documents import zip_documents
11+
from cycode.cli.utils.progress_bar import SbomReportProgressBarSection
12+
13+
14+
@click.command(short_help='Generate SBOM report for provided path in the command.')
15+
@click.argument('path', nargs=1, type=click.Path(exists=True, resolve_path=True), required=True)
16+
@click.pass_context
17+
def sbom_path_command(context: click.Context, path: str) -> None:
18+
client = context.obj['client']
19+
report_parameters = context.obj['report_parameters']
20+
output_format = report_parameters.output_format
21+
output_file = context.obj['output_file']
22+
23+
progress_bar = context.obj['progress_bar']
24+
progress_bar.start()
25+
26+
start_scan_time = time.time()
27+
report_execution_id = -1
28+
29+
try:
30+
documents = get_relevant_document(
31+
progress_bar, SbomReportProgressBarSection.PREPARE_LOCAL_FILES, consts.SCA_SCAN_TYPE, path
32+
)
33+
# TODO(MarshalX): combine perform_pre_scan_documents_actions with get_relevant_document.
34+
# unhardcode usage of context in perform_pre_scan_documents_actions
35+
perform_pre_scan_documents_actions(context, consts.SCA_SCAN_TYPE, documents)
36+
37+
zipped_documents = zip_documents(consts.SCA_SCAN_TYPE, documents)
38+
report_execution = client.request_sbom_report_execution(report_parameters, zip_file=zipped_documents)
39+
report_execution_id = report_execution.id
40+
41+
create_sbom_report(progress_bar, client, report_execution_id, output_file, output_format)
42+
43+
send_report_feedback(
44+
client=client,
45+
start_scan_time=start_scan_time,
46+
report_type='SBOM',
47+
report_command_type='path',
48+
request_report_parameters=report_parameters.to_dict(without_entity_type=False),
49+
report_execution_id=report_execution_id,
50+
request_zip_file_size=zipped_documents.size,
51+
)
52+
except Exception as e:
53+
progress_bar.stop()
54+
55+
send_report_feedback(
56+
client=client,
57+
start_scan_time=start_scan_time,
58+
report_type='SBOM',
59+
report_command_type='path',
60+
request_report_parameters=report_parameters.to_dict(without_entity_type=False),
61+
report_execution_id=report_execution_id,
62+
error_message=str(e),
63+
)
64+
65+
handle_report_exception(context, e)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import os
2+
import pathlib
3+
import re
4+
from typing import Optional
5+
6+
import click
7+
8+
9+
class SbomReportFile:
10+
def __init__(self, storage_path: str, output_format: str, output_file: Optional[pathlib.Path]) -> None:
11+
if output_file is None:
12+
output_file = pathlib.Path(storage_path)
13+
14+
output_ext = f'.{output_format}'
15+
if output_file.suffix != output_ext:
16+
output_file = output_file.with_suffix(output_ext)
17+
18+
self._file_path = output_file
19+
20+
def is_exists(self) -> bool:
21+
return self._file_path.exists()
22+
23+
def _prompt_overwrite(self) -> bool:
24+
return click.confirm(f'File {self._file_path} already exists. Save with a different filename?', default=True)
25+
26+
def _write(self, content: str) -> None:
27+
with open(self._file_path, 'w', encoding='UTF-8') as f:
28+
f.write(content)
29+
30+
def _notify_about_saved_file(self) -> None:
31+
click.echo(f'Report saved to {self._file_path}')
32+
33+
def _find_and_set_unique_filename(self) -> None:
34+
attempt_no = 0
35+
while self.is_exists():
36+
attempt_no += 1
37+
38+
base, ext = os.path.splitext(self._file_path)
39+
# Remove previous suffix
40+
base = re.sub(r'-\d+$', '', base)
41+
42+
self._file_path = pathlib.Path(f'{base}-{attempt_no}{ext}')
43+
44+
def write(self, content: str) -> None:
45+
if self.is_exists() and self._prompt_overwrite():
46+
self._find_and_set_unique_filename()
47+
48+
self._write(content)
49+
self._notify_about_saved_file()

0 commit comments

Comments
 (0)