diff options
Diffstat (limited to 'src/nvd/cli/commands')
| -rw-r--r-- | src/nvd/cli/commands/__init__.py | 1 | ||||
| -rw-r--r-- | src/nvd/cli/commands/config.py | 95 | ||||
| -rw-r--r-- | src/nvd/cli/commands/cpe.py | 170 | ||||
| -rw-r--r-- | src/nvd/cli/commands/cve.py | 150 |
4 files changed, 416 insertions, 0 deletions
diff --git a/src/nvd/cli/commands/__init__.py b/src/nvd/cli/commands/__init__.py new file mode 100644 index 0000000..3f4c467 --- /dev/null +++ b/src/nvd/cli/commands/__init__.py @@ -0,0 +1 @@ +"""CLI commands package.""" diff --git a/src/nvd/cli/commands/config.py b/src/nvd/cli/commands/config.py new file mode 100644 index 0000000..46da597 --- /dev/null +++ b/src/nvd/cli/commands/config.py @@ -0,0 +1,95 @@ +"""Configuration CLI commands.""" + +import os +import sys +from pathlib import Path +from typing import Optional + +import typer +import yaml +from rich.console import Console +from rich.table import Table + +CONTEXT_SETTINGS = {"help_option_names": ["-h", "--help"]} + +app = typer.Typer( + no_args_is_help=True, + help="Manage nvdb configuration (API keys, settings)", + context_settings=CONTEXT_SETTINGS, + epilog="Examples:\n" + " nvdb config show\n" + " nvdb config set-api-key YOUR_API_KEY\n" + " nvdb config clear", +) + +# Console for stderr (status messages, errors) +console = Console(stderr=True) + +CONFIG_DIR = Path.home() / ".config" / "nvd" +CONFIG_FILE = CONFIG_DIR / "config.yaml" + + +def load_config() -> dict: + """Load configuration from file.""" + if not CONFIG_FILE.exists(): + return {} + with open(CONFIG_FILE) as f: + return yaml.safe_load(f) or {} + + +def save_config(config: dict) -> None: + """Save configuration to file.""" + CONFIG_DIR.mkdir(parents=True, exist_ok=True) + with open(CONFIG_FILE, "w") as f: + yaml.dump(config, f) + + +@app.command("set-api-key") +def set_api_key( + api_key: str = typer.Argument(..., help="Your NVD API key"), +) -> None: + """Set the NVD API key in config file.""" + config = load_config() + config["api_key"] = api_key + save_config(config) + console.print(f"[green]API key saved to {CONFIG_FILE}[/green]") + console.print("[yellow]Tip: You can also set the NVD_API_KEY environment variable[/yellow]") + + +@app.command("show") +def show_config() -> None: + """Show current configuration.""" + config = load_config() + env_api_key = os.getenv("NVD_API_KEY") + + table = Table(title="NVD API Configuration", show_header=True, header_style="bold magenta") + table.add_column("Setting", style="cyan") + table.add_column("Value", style="green") + table.add_column("Source", style="yellow") + + if config.get("api_key"): + masked_key = config["api_key"][:8] + "..." if len(config["api_key"]) > 8 else "***" + table.add_row("API Key", masked_key, f"Config file ({CONFIG_FILE})") + elif env_api_key: + masked_key = env_api_key[:8] + "..." if len(env_api_key) > 8 else "***" + table.add_row("API Key", masked_key, "Environment variable") + else: + table.add_row("API Key", "Not set", "N/A") + + console.print(table) + + if not config.get("api_key") and not env_api_key: + console.print("\n[yellow]No API key configured. Using unauthenticated access (5 req/30s)[/yellow]") + console.print("[blue]Set an API key for higher rate limits (50 req/30s):[/blue]") + console.print(" nvdb config set-api-key YOUR_KEY") + console.print(" or export NVD_API_KEY=YOUR_KEY") + + +@app.command("clear") +def clear_config() -> None: + """Clear saved configuration.""" + if CONFIG_FILE.exists(): + CONFIG_FILE.unlink() + console.print("[green]Configuration cleared[/green]") + else: + console.print("[yellow]No configuration file found[/yellow]") diff --git a/src/nvd/cli/commands/cpe.py b/src/nvd/cli/commands/cpe.py new file mode 100644 index 0000000..16a6cbc --- /dev/null +++ b/src/nvd/cli/commands/cpe.py @@ -0,0 +1,170 @@ +"""CPE CLI commands.""" + +import asyncio +import sys +from typing import Optional + +import typer +from rich.console import Console +from rich.progress import Progress, SpinnerColumn, TextColumn + +from ...client import NVDClient +from ..formatters import ( + format_cpe_table, + format_json, + format_json_lines, + format_match_criteria_table, + format_yaml, +) + +CONTEXT_SETTINGS = {"help_option_names": ["-h", "--help"]} + +app = typer.Typer( + no_args_is_help=True, + help="Query CPE (product) information from the NVD", + context_settings=CONTEXT_SETTINGS, + epilog="Examples:\n" + " # Keyword search (recommended for most searches)\n" + " nvdb cpe search --keyword 'apache'\n" + " nvdb cpe search --keyword 'windows 10'\n\n" + " # CPE match string (requires cpe:2.3 format)\n" + " nvdb cpe search --match-string 'cpe:2.3:a:microsoft:*'\n" + " nvdb cpe search --match-string 'cpe:2.3:o:linux:*'\n\n" + " # Get match criteria for a CVE\n" + " nvdb cpe matches --cve CVE-2021-44228", +) + +# Console for stderr (progress, errors, warnings) +console = Console(stderr=True) + + +@app.command("get") +def get_cpe( + cpe_id: str = typer.Argument(..., help="CPE Name UUID"), + api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"), + output_format: str = typer.Option("table", "--format", "-f", help="Output format: table, json, yaml"), +) -> None: + """Get details for a specific CPE by UUID.""" + + async def _get() -> None: + async with NVDClient(api_key=api_key) as client: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + ) as progress: + progress.add_task(f"Fetching CPE {cpe_id}...", total=None) + cpe = await client.cpe.get_cpe(cpe_id) + + if output_format == "json": + format_json(cpe) + elif output_format == "yaml": + format_yaml(cpe) + else: + format_cpe_table([cpe]) + + asyncio.run(_get()) + + +@app.command("search") +def search_cpes( + keyword: Optional[str] = typer.Option(None, "--keyword", "-k", help="Keyword to search in CPE titles (e.g., 'windows' or 'apache')"), + match_string: Optional[str] = typer.Option(None, "--match-string", "-m", help="CPE match string in cpe:2.3 format (e.g., 'cpe:2.3:a:vendor:product:*')"), + limit: int = typer.Option(20, "--limit", "-l", help="Maximum number of results"), + api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"), + output_format: str = typer.Option("table", "--format", "-f", help="Output format: table, json, yaml"), +) -> None: + """Search for CPEs. + + Use --keyword for simple text search in product names. + Use --match-string for CPE formatted strings (cpe:2.3:...). + """ + + async def _search() -> None: + from ...exceptions import NotFoundError + + results = [] + try: + async with NVDClient(api_key=api_key) as client: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + ) as progress: + task = progress.add_task("Searching CPEs...", total=None) + + async for cpe in client.cpe.search_cpes( + keyword_search=keyword, + cpe_match_string=match_string, + ): + results.append(cpe) + if len(results) >= limit: + break + progress.update(task, description=f"Searching CPEs... ({len(results)} found)") + + if not results: + console.print("[yellow]No CPEs found matching criteria[/yellow]") + return + + if output_format == "json": + format_json_lines(results) + elif output_format == "yaml": + for cpe in results: + format_yaml(cpe) + console.print("---") + else: + format_cpe_table(results) + except NotFoundError as e: + console.print(f"[red]Error:[/red] {e.message}") + if match_string: + console.print("\n[yellow]Tip:[/yellow] Use --keyword for text search, or use CPE format for --match-string") + console.print("Example: [blue]nvdb cpe search --keyword 'soft-serve'[/blue]") + console.print("Or: [blue]nvdb cpe search --match-string 'cpe:2.3:a:*:soft*'[/blue]") + raise typer.Exit(1) + + asyncio.run(_search()) + + +@app.command("matches") +def get_matches( + cve_id: Optional[str] = typer.Option(None, "--cve", help="CVE ID to get match criteria for"), + match_criteria_id: Optional[str] = typer.Option(None, "--id", help="Specific match criteria UUID"), + limit: int = typer.Option(20, "--limit", "-l", help="Maximum number of results"), + api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"), + output_format: str = typer.Option("table", "--format", "-f", help="Output format: table, json, yaml"), +) -> None: + """Get CPE match criteria.""" + + async def _matches() -> None: + results = [] + async with NVDClient(api_key=api_key) as client: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + ) as progress: + task = progress.add_task("Fetching match criteria...", total=None) + + async for match in client.cpematch.search_match_criteria( + cve_id=cve_id, + match_criteria_id=match_criteria_id, + ): + results.append(match) + if len(results) >= limit: + break + progress.update(task, description=f"Fetching match criteria... ({len(results)} found)") + + if not results: + console.print("[yellow]No match criteria found[/yellow]") + return + + if output_format == "json": + format_json_lines(results) + elif output_format == "yaml": + for match in results: + format_yaml(match) + console.print("---") + else: + format_match_criteria_table(results) + + asyncio.run(_matches()) diff --git a/src/nvd/cli/commands/cve.py b/src/nvd/cli/commands/cve.py new file mode 100644 index 0000000..3f28ac7 --- /dev/null +++ b/src/nvd/cli/commands/cve.py @@ -0,0 +1,150 @@ +"""CVE CLI commands.""" + +import asyncio +import sys +from typing import Optional + +import typer +from rich.console import Console +from rich.progress import Progress, SpinnerColumn, TextColumn + +from ...client import NVDClient +from ..formatters import format_cve_table, format_json, format_json_lines, format_yaml + +CONTEXT_SETTINGS = {"help_option_names": ["-h", "--help"]} + +app = typer.Typer( + no_args_is_help=True, + help="Query CVE (vulnerability) information from the NVD", + context_settings=CONTEXT_SETTINGS, + epilog="Examples:\n" + " nvdb cve get CVE-2021-44228\n" + " nvdb cve search --keyword 'sql injection' --severity HIGH\n" + " nvdb cve search --has-kev --limit 20\n" + " nvdb cve history CVE-2021-44228", +) + +# Console for stderr (progress, errors, warnings) +console = Console(stderr=True) + + +@app.command("get") +def get_cve( + cve_id: str = typer.Argument(..., help="CVE ID (e.g., CVE-2021-44228)"), + api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"), + output_format: str = typer.Option("table", "--format", "-f", help="Output format: table, json, yaml"), +) -> None: + """Get details for a specific CVE.""" + + async def _get() -> None: + async with NVDClient(api_key=api_key) as client: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + ) as progress: + progress.add_task(f"Fetching {cve_id}...", total=None) + cve = await client.cve.get_cve(cve_id) + + if output_format == "json": + format_json(cve) + elif output_format == "yaml": + format_yaml(cve) + else: + format_cve_table([cve]) + + asyncio.run(_get()) + + +@app.command("search") +def search_cves( + keyword: Optional[str] = typer.Option(None, "--keyword", "-k", help="Keyword to search in descriptions"), + cpe: Optional[str] = typer.Option(None, "--cpe", help="CPE name to filter by"), + severity: Optional[str] = typer.Option(None, "--severity", "-s", help="CVSS v3 severity: LOW, MEDIUM, HIGH, CRITICAL"), + cvss_v2_severity: Optional[str] = typer.Option(None, "--cvss-v2-severity", help="CVSS v2 severity"), + cvss_v3_severity: Optional[str] = typer.Option(None, "--cvss-v3-severity", help="CVSS v3 severity"), + cwe_id: Optional[str] = typer.Option(None, "--cwe", help="CWE ID (e.g., CWE-79)"), + has_kev: bool = typer.Option(False, "--has-kev", help="Only CVEs in CISA KEV catalog"), + pub_start: Optional[str] = typer.Option(None, "--pub-start", help="Publication start date (ISO-8601)"), + pub_end: Optional[str] = typer.Option(None, "--pub-end", help="Publication end date (ISO-8601)"), + mod_start: Optional[str] = typer.Option(None, "--mod-start", help="Last modified start date (ISO-8601)"), + mod_end: Optional[str] = typer.Option(None, "--mod-end", help="Last modified end date (ISO-8601)"), + limit: int = typer.Option(20, "--limit", "-l", help="Maximum number of results"), + api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"), + output_format: str = typer.Option("table", "--format", "-f", help="Output format: table, json, yaml"), +) -> None: + """Search for CVEs with various filters.""" + + async def _search() -> None: + results = [] + async with NVDClient(api_key=api_key) as client: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + ) as progress: + task = progress.add_task("Searching CVEs...", total=None) + + async for cve in client.cve.search_cves( + keyword_search=keyword, + cpe_name=cpe, + cvss_v2_severity=cvss_v2_severity, + cvss_v3_severity=cvss_v3_severity or severity, + cwe_id=cwe_id, + has_kev=has_kev if has_kev else None, + pub_start_date=pub_start, + pub_end_date=pub_end, + last_mod_start_date=mod_start, + last_mod_end_date=mod_end, + ): + results.append(cve) + if len(results) >= limit: + break + progress.update(task, description=f"Searching CVEs... ({len(results)} found)") + + if not results: + console.print("[yellow]No CVEs found matching criteria[/yellow]") + return + + if output_format == "json": + format_json_lines(results) + elif output_format == "yaml": + for cve in results: + format_yaml(cve) + console.print("---") + else: + format_cve_table(results) + + asyncio.run(_search()) + + +@app.command("history") +def get_history( + cve_id: str = typer.Argument(..., help="CVE ID to get history for"), + api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"), + output_format: str = typer.Option("json", "--format", "-f", help="Output format: json, yaml"), +) -> None: + """Get change history for a CVE.""" + + async def _history() -> None: + async with NVDClient(api_key=api_key) as client: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + ) as progress: + progress.add_task(f"Fetching history for {cve_id}...", total=None) + history = await client.history.get_cve_history(cve_id) + + if not history: + console.print(f"[yellow]No history found for {cve_id}[/yellow]") + return + + if output_format == "yaml": + for change in history: + format_yaml(change) + console.print("---") + else: + format_json_lines(history) + + asyncio.run(_history()) |
