Skip to content

Refactor to use Dependency structure #564

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 33 additions & 107 deletions guarddog/cli.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
""" Package Malware Scanner
"""Package Malware Scanner

CLI command that scans a package version for user-specified malware flags.
Includes rules based on package registry metadata and source code analysis.
"""

from functools import reduce
import json as js
import logging
import os
import sys
Expand All @@ -14,12 +13,12 @@

import click
from prettytable import PrettyTable
from termcolor import colored

from guarddog.analyzer.metadata import get_metadata_detectors
from guarddog.analyzer.sourcecode import get_sourcecode_rules
from guarddog.ecosystems import ECOSYSTEM
from guarddog.reporters.sarif import report_verify_sarif
from guarddog.reporters.reporter_factory import ReporterFactory, ReporterType

from guarddog.scanners import get_package_scanner, get_project_scanner
from guarddog.utils.archives import safe_extract

Expand Down Expand Up @@ -127,7 +126,7 @@ def _get_all_rules(ecosystem: ECOSYSTEM) -> set[str]:

def _get_rule_param(
rules: tuple[str, ...], exclude_rules: tuple[str, ...], ecosystem: ECOSYSTEM
) -> Optional[set]:
) -> Optional[set[str]]:
"""
This function should return None if no rules are provided
Else a set of rules to be used for scanning
Expand Down Expand Up @@ -162,28 +161,20 @@ def _verify(
log.error(f"Command verify is not supported for ecosystem {ecosystem}")
exit(1)

def display_result(result: dict) -> None:
identifier = (
result["dependency"]
if result["version"] is None
else f"{result['dependency']} version {result['version']}"
)
if output_format is None:
print_scan_results(result.get("result"), identifier)

if len(result.get("errors", [])) > 0:
print_errors(result.get("error"), identifier)
dependencies, results = scanner.scan_local(path=path, rules=rule_param)

results = scanner.scan_local(path, rule_param, display_result)
if output_format == "json":
return_value = js.dumps(results)
rule_docs = list(rule_param or _get_all_rules(ecosystem=ecosystem))

if output_format == "sarif":
sarif_rules = _get_all_rules(ecosystem)
return_value = report_verify_sarif(path, list(sarif_rules), results, ecosystem)
reporter = ReporterFactory.create_reporter(ReporterType.from_str(output_format))
stdout, stderr = reporter.render_verify(
dependency_files=dependencies,
rule_names=rule_docs,
scan_results=results,
ecosystem=ecosystem,
)

if output_format is not None:
print(return_value)
sys.stdout.write(stdout)
sys.stderr.write(stderr)

if exit_non_zero_on_finding:
exit_with_status_code([result["result"] for result in results])
Expand Down Expand Up @@ -231,10 +222,10 @@ def _scan(
log.error(f"Error occurred while scanning target {identifier}: '{e}'\n")
sys.exit(1)

if output_format == "json":
print(js.dumps(result))
else:
print_scan_results(result, result["package"])
reporter = ReporterFactory.create_reporter(ReporterType.from_str(output_format))
stdout, stderr = reporter.render_scan(result)
sys.stdout.write(stdout)
sys.stderr.write(stderr)

if exit_non_zero_on_finding:
exit_with_status_code([result])
Expand Down Expand Up @@ -262,6 +253,7 @@ class CliEcosystem(click.Group):
Class that dynamically represents an ecosystem in click
It dynamically selects the ruleset to the instantiated ecosystem
"""

def __init__(self, ecosystem: ECOSYSTEM):
super().__init__()
self.name = ecosystem.name.lower()
Expand All @@ -288,7 +280,12 @@ def rule_options(fn):
@scan_options
@rule_options
def scan_ecosystem(
target, version, rules, exclude_rules, output_format, exit_non_zero_on_finding
target,
version,
rules,
exclude_rules,
output_format,
exit_non_zero_on_finding,
):
return _scan(
target,
Expand All @@ -304,7 +301,9 @@ def scan_ecosystem(
@common_options
@verify_options
@rule_options
def verify_ecosystem(target, rules, exclude_rules, output_format, exit_non_zero_on_finding):
def verify_ecosystem(
target, rules, exclude_rules, output_format, exit_non_zero_on_finding
):
return _verify(
target,
rules,
Expand All @@ -314,7 +313,9 @@ def verify_ecosystem(target, rules, exclude_rules, output_format, exit_non_zero_
self.ecosystem,
)

@click.command("list-rules", help=f"List available rules for {self.ecosystem.name}")
@click.command(
"list-rules", help=f"List available rules for {self.ecosystem.name}"
)
def list_rules_ecosystem():
return _list_rules(self.ecosystem)

Expand All @@ -333,7 +334,7 @@ def list_rules_ecosystem():
@verify_options
@legacy_rules_options
def verify(target, rules, exclude_rules, output_format, exit_non_zero_on_finding):
return _verify(
return verify(
target,
rules,
exclude_rules,
Expand Down Expand Up @@ -361,81 +362,6 @@ def scan(
)


# Pretty prints scan results for the console
def print_scan_results(results, identifier):
num_issues = results.get("issues")
errors = results.get("errors", [])

if num_issues == 0:
print(
"Found "
+ colored("0 potentially malicious indicators", "green", attrs=["bold"])
+ " scanning "
+ colored(identifier, None, attrs=["bold"])
)
print()
else:
print(
"Found "
+ colored(
str(num_issues) + " potentially malicious indicators",
"red",
attrs=["bold"],
)
+ " in "
+ colored(identifier, None, attrs=["bold"])
)
print()

findings = results.get("results", [])
for finding in findings:
description = findings[finding]
if isinstance(description, str): # package metadata
print(colored(finding, None, attrs=["bold"]) + ": " + description)
print()
elif isinstance(description, list): # semgrep rule result:
source_code_findings = description
print(
colored(finding, None, attrs=["bold"])
+ ": found "
+ str(len(source_code_findings))
+ " source code matches"
)
for finding in source_code_findings:
print(
" * "
+ finding["message"]
+ " at "
+ finding["location"]
+ "\n "
+ format_code_line_for_output(finding["code"])
)
print()

if len(errors) > 0:
print_errors(errors, identifier)
print("\n")


def print_errors(errors, identifier):
print(
colored("Some rules failed to run while scanning " + identifier + ":", "yellow")
)
print()
for rule in errors:
print(f"* {rule}: {errors[rule]}")
print()


def format_code_line_for_output(code):
return " " + colored(
code.strip().replace("\n", "\n ").replace("\t", " "),
None,
"on_red",
attrs=["bold"],
)


# Given the results, exit with the appropriate status code
def exit_with_status_code(results):
for result in results:
Expand Down
28 changes: 28 additions & 0 deletions guarddog/reporters/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from guarddog.scanners.scanner import DependencyFile
from typing import List
from guarddog.ecosystems import ECOSYSTEM


class BaseReporter:
"""
Base class for all reporters.
"""

@staticmethod
def render_scan(scan_results: dict) -> tuple[str, str]:
"""
Report the scans results.
"""
raise NotImplementedError("Subclasses must implement this method.")

@staticmethod
def render_verify(
dependency_files: List[DependencyFile],
rule_names: list[str],
scan_results: list[dict],
ecosystem: ECOSYSTEM,
) -> tuple[str, str]:
"""
Report the scans results.
"""
raise NotImplementedError("Subclasses must implement this method.")
138 changes: 138 additions & 0 deletions guarddog/reporters/human_readable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
from termcolor import colored
from guarddog.reporters import BaseReporter
from typing import List
from guarddog.scanners.scanner import DependencyFile
from guarddog.ecosystems import ECOSYSTEM


class HumanReadableReporter(BaseReporter):
"""
HumanReadableReporter is a class that formats and prints scan results in a human-readable format.
"""

@staticmethod
def print_errors(identifier: str, results: dict) -> str:
errors = results.get("errors", [])
if not errors:
return ""

lines = []
lines.append("")
lines.append(
colored(
"Some rules failed to run while scanning " + identifier + ":",
"yellow",
)
)
lines.append("")
for rule in errors:
lines.append(f"* {rule}: {errors[rule]}")

return "\n".join(lines)

@staticmethod
def print_scan_results(identifier: str, results: dict) -> str:

def _format_code_line_for_output(code) -> str:
return " " + colored(
code.strip().replace("\n", "\n ").replace("\t", " "),
None,
"on_red",
attrs=["bold"],
)

num_issues = results.get("issues")
lines = []

if num_issues == 0:
lines.append(
"Found "
+ colored("0 potentially malicious indicators", "green", attrs=["bold"])
+ " scanning "
+ colored(identifier, None, attrs=["bold"])
)
lines.append("")
else:
lines.append(
"Found "
+ colored(
str(num_issues) + " potentially malicious indicators",
"red",
attrs=["bold"],
)
+ " in "
+ colored(identifier, None, attrs=["bold"])
)
lines.append("")

findings = results.get("results", [])
for finding in findings:
description = findings[finding]
if isinstance(description, str): # package metadata
lines.append(
colored(finding, None, attrs=["bold"]) + ": " + description
)
lines.append("")
elif isinstance(description, list): # semgrep rule result:
source_code_findings = description
lines.append(
colored(finding, None, attrs=["bold"])
+ ": found "
+ str(len(source_code_findings))
+ " source code matches"
)
for finding in source_code_findings:
lines.append(
" * "
+ finding["message"]
+ " at "
+ finding["location"]
+ "\n "
+ _format_code_line_for_output(finding["code"])
)
lines.append("")

return "\n".join(lines)

@staticmethod
def render_scan(scan_results: dict) -> tuple[str, str]:
"""
Report the scans results in a human-readable format.

Args:
scan_results (dict): The scan results to be reported.
"""
return (
HumanReadableReporter.print_scan_results(
identifier=scan_results["package"], results=scan_results
),
HumanReadableReporter.print_errors(
identifier=scan_results["package"], results=scan_results
),
)

@staticmethod
def render_verify(
dependency_files: List[DependencyFile],
rule_names: list[str],
scan_results: list[dict],
ecosystem: ECOSYSTEM,
) -> tuple[str, str]:
return (
"\n".join(
[
HumanReadableReporter.print_scan_results(
identifier=s["dependency"], results=s["result"]
)
for s in scan_results
]
),
"\n".join(
[
HumanReadableReporter.print_errors(
identifier=s["dependency"], results=s["result"]
)
for s in scan_results
]
),
)
Loading