import json import sys import click from rich.console import Console from rich.table import Table from rich.panel import Panel from rich import print as rprint from .api import VulnersClient console = Console() def get_client(api_key: str | None) -> VulnersClient: client = VulnersClient(api_key=api_key) if not client.api_key: console.print("[yellow]Warning: No API key set. Set VULNERS_API_KEY or use --api-key[/yellow]") return client def print_error(msg: str): console.print(f"[red]Error:[/red] {msg}") sys.exit(1) @click.group(context_settings={"help_option_names": ["-h", "--help"]}) @click.option("--api-key", envvar="VULNERS_API_KEY", help="Vulners API key") @click.pass_context def main(ctx, api_key): """Query CVEs and CPEs from vulners.com""" ctx.ensure_object(dict) ctx.obj["api_key"] = api_key def build_query( base: str, type_: str | None, cvss_min: float | None, cvss_max: float | None, after: str | None, before: str | None, vendor: str | None, product: str | None, version: str | None, order: str | None, ) -> str: parts = [base] if base else [] if type_: parts.append(f"type:{type_}") if cvss_min is not None and cvss_max is not None: parts.append(f"cvss.score:[{cvss_min} TO {cvss_max}]") elif cvss_min is not None: parts.append(f"cvss.score:[{cvss_min} TO 10]") elif cvss_max is not None: parts.append(f"cvss.score:[0 TO {cvss_max}]") if after: parts.append(f"published:[{after} TO *]") if before: parts.append(f"published:[* TO {before}]") if vendor: parts.append(f"affectedSoftware.vendor:{vendor}") if product: parts.append(f"affectedSoftware.name:{product}") if version: parts.append(f"affectedSoftware.version:{version}") query = " AND ".join(parts) if parts else "*" if order: query += f" order:{order}" return query @main.command() @click.argument("query", default="") @click.option("-n", "--limit", default=10, help="Number of results") @click.option("--skip", default=0, help="Skip first N results") @click.option("-t", "--type", "type_", help="Filter by type (cve, exploit, nessus, etc.)") @click.option("--cvss-min", type=float, help="Minimum CVSS score") @click.option("--cvss-max", type=float, help="Maximum CVSS score") @click.option("--after", help="Published after date (YYYY-MM-DD)") @click.option("--before", help="Published before date (YYYY-MM-DD)") @click.option("--vendor", help="Filter by software vendor") @click.option("--product", help="Filter by product name") @click.option("--version", help="Filter by version") @click.option("--order", type=click.Choice(["published", "cvss"]), help="Order results") @click.option("--full", "-f", is_flag=True, help="Show full descriptions") @click.option("--json", "as_json", is_flag=True, help="Output as JSON") @click.pass_context def search(ctx, query, limit, skip, type_, cvss_min, cvss_max, after, before, vendor, product, version, order, full, as_json): """Search vulnerabilities using Lucene query syntax""" client = get_client(ctx.obj["api_key"]) final_query = build_query(query, type_, cvss_min, cvss_max, after, before, vendor, product, version, order) try: result = client.search(final_query, size=limit, skip=skip) except Exception as e: print_error(str(e)) if as_json: click.echo(json.dumps(result, indent=2)) return data = result.get("data", {}) if data.get("total", 0) == 0: console.print("[yellow]No results found[/yellow]") return console.print(f"[green]Found {data['total']} results[/green]\n") for doc in data.get("search", []): src = doc.get("_source", {}) title = src.get("title", src.get("id", "Unknown")) doc_type = src.get("type", "unknown") cvss = src.get("cvss", {}).get("score", "-") published = src.get("published", "-")[:10] if src.get("published") else "-" raw_desc = src.get("description", "") truncated = not full and len(raw_desc) > 200 desc = raw_desc if full else raw_desc[:200] panel = Panel( f"{desc}..." if truncated else (desc or "No description"), title=f"[bold]{title}[/bold]", subtitle=f"Type: {doc_type} | CVSS: {cvss} | Published: {published}", border_style="dim", ) console.print(panel) console.print() @main.command() @click.argument("cve_id") @click.option("--json", "as_json", is_flag=True, help="Output as JSON") @click.pass_context def cve(ctx, cve_id, as_json): """Get details for a specific CVE (e.g., CVE-2021-44228)""" client = get_client(ctx.obj["api_key"]) cve_id = cve_id.upper() try: result = client.get_cve(cve_id) except Exception as e: print_error(str(e)) if as_json: click.echo(json.dumps(result, indent=2)) return docs = result.get("data", {}).get("search", []) if not docs: console.print(f"[yellow]CVE {cve_id} not found[/yellow]") return src = docs[0].get("_source", {}) title = src.get("title", cve_id) cvss = src.get("cvss", {}) desc = src.get("description", "No description available") published = src.get("published", "Unknown")[:10] if src.get("published") else "Unknown" refs = src.get("references", []) cpes = src.get("cpe", []) console.print(Panel(f"[bold cyan]{title}[/bold cyan]", expand=False, border_style="dim")) console.print(f"\n[bold]CVSS Score:[/bold] {cvss.get('score', 'N/A')} ({cvss.get('vector', 'N/A')})") console.print(f"[bold]Published:[/bold] {published}") console.print(f"\n[bold]Description:[/bold]\n{desc}") if cpes: console.print(f"\n[bold]Affected CPEs:[/bold]") for cpe in cpes[:10]: console.print(f" • {cpe}") if len(cpes) > 10: console.print(f" [dim]... and {len(cpes) - 10} more[/dim]") if refs: console.print(f"\n[bold]References:[/bold]") for ref in refs[:5]: console.print(f" • {ref}") if len(refs) > 5: console.print(f" [dim]... and {len(refs) - 5} more[/dim]") @main.command() @click.option("--vendor", "-v", help="CPE vendor name") @click.option("--product", "-p", help="CPE product name") @click.option("--version", help="CPE version") @click.option("-n", "--limit", default=10, help="Number of results") @click.option("--json", "as_json", is_flag=True, help="Output as JSON") @click.pass_context def cpe(ctx, vendor, product, version, limit, as_json): """Search vulnerabilities by CPE attributes""" if not any([vendor, product, version]): print_error("At least one of --vendor, --product, or --version required") client = get_client(ctx.obj["api_key"]) try: result = client.search_cpe(vendor=vendor, product=product, version=version, size=limit) except Exception as e: print_error(str(e)) if as_json: click.echo(json.dumps(result, indent=2)) return data = result.get("data", {}) total = data.get("total", 0) if total == 0: console.print("[yellow]No results found[/yellow]") return console.print(f"[green]Found {total} vulnerabilities[/green]\n") table = Table(show_header=True, border_style="dim") table.add_column("ID", style="cyan") table.add_column("Title", max_width=50) table.add_column("CVSS", justify="right") table.add_column("Published") for doc in data.get("search", []): src = doc.get("_source", {}) table.add_row( src.get("id", "-"), (src.get("title", "-")[:47] + "...") if len(src.get("title", "")) > 50 else src.get("title", "-"), str(src.get("cvss", {}).get("score", "-")), src.get("published", "-")[:10] if src.get("published") else "-", ) console.print(table) @main.command() @click.argument("software") @click.argument("version", required=False) @click.option("-n", "--limit", default=20, help="Max vulnerabilities") @click.option("--json", "as_json", is_flag=True, help="Output as JSON") @click.pass_context def software(ctx, software, version, limit, as_json): """Find vulnerabilities for a software (e.g., apache 2.4.49)""" client = get_client(ctx.obj["api_key"]) try: result = client.software_vulns(software, version, max_vulns=limit) except Exception as e: print_error(str(e)) if as_json: click.echo(json.dumps(result, indent=2)) return data = result.get("data", {}) vulns = data.get("search", []) or data.get("vulnerabilities", []) name = f"{software} {version}" if version else software if not vulns: console.print(f"[yellow]No vulnerabilities found for {name}[/yellow]") return console.print(f"[green]Found {len(vulns)} vulnerabilities for {name}[/green]\n") table = Table(show_header=True, border_style="dim") table.add_column("CVE", style="cyan") table.add_column("Title", max_width=50) table.add_column("CVSS", justify="right") for vuln in vulns: if isinstance(vuln, dict): src = vuln.get("_source", vuln) cve_id = src.get("id", "-") title = src.get("title", "-") cvss = src.get("cvss", {}).get("score", "-") if isinstance(src.get("cvss"), dict) else "-" else: cve_id = str(vuln) title = "-" cvss = "-" table.add_row( cve_id, (title[:47] + "...") if len(title) > 50 else title, str(cvss), ) console.print(table) if __name__ == "__main__": main()