diff options
Diffstat (limited to 'src/vulners')
| -rw-r--r-- | src/vulners/__init__.py | 3 | ||||
| -rw-r--r-- | src/vulners/api.py | 74 | ||||
| -rw-r--r-- | src/vulners/cli.py | 282 |
3 files changed, 359 insertions, 0 deletions
diff --git a/src/vulners/__init__.py b/src/vulners/__init__.py new file mode 100644 index 0000000..eee1892 --- /dev/null +++ b/src/vulners/__init__.py @@ -0,0 +1,3 @@ +"""Vulners CLI - Query CVEs and CPEs from vulners.com""" + +__version__ = "0.1.0" diff --git a/src/vulners/api.py b/src/vulners/api.py new file mode 100644 index 0000000..2c0fdbd --- /dev/null +++ b/src/vulners/api.py @@ -0,0 +1,74 @@ +import os +from dataclasses import dataclass +from typing import Any + +import httpx + +BASE_URL = "https://vulners.com/api/v3" + + +@dataclass +class VulnersClient: + api_key: str | None = None + timeout: float = 30.0 + + def __post_init__(self): + if not self.api_key: + self.api_key = os.environ.get("VULNERS_API_KEY") + + def _request(self, endpoint: str, payload: dict[str, Any]) -> dict[str, Any]: + headers = {"Content-Type": "application/json"} + if self.api_key: + headers["X-Api-Key"] = self.api_key + + with httpx.Client(timeout=self.timeout) as client: + resp = client.post(f"{BASE_URL}{endpoint}", headers=headers, json=payload) + resp.raise_for_status() + return resp.json() + + def search( + self, + query: str, + size: int = 10, + skip: int = 0, + fields: list[str] | None = None, + ) -> dict[str, Any]: + payload = {"query": query, "size": size, "skip": skip} + if fields: + payload["fields"] = fields + return self._request("/search/lucene/", payload) + + def get_cve(self, cve_id: str) -> dict[str, Any]: + return self.search(f"id:{cve_id}", size=1) + + def search_cpe( + self, + vendor: str | None = None, + product: str | None = None, + version: str | None = None, + size: int = 10, + ) -> dict[str, Any]: + parts = [] + if vendor: + parts.append(f"cpe.vendor:{vendor}") + if product: + parts.append(f"cpe.product:{product}") + if version: + parts.append(f"cpe.version:{version}") + query = " AND ".join(parts) if parts else "*" + return self.search(query, size=size) + + def software_vulns( + self, + software: str, + version: str | None = None, + max_vulns: int = 20, + ) -> dict[str, Any]: + payload = { + "software": software, + "type": "software", + "maxVulnerabilities": max_vulns, + } + if version: + payload["version"] = version + return self._request("/burp/softwareapi/", payload) diff --git a/src/vulners/cli.py b/src/vulners/cli.py new file mode 100644 index 0000000..a5d954a --- /dev/null +++ b/src/vulners/cli.py @@ -0,0 +1,282 @@ +import json +import sys + +import click +from rich.console import Console +from rich.table import Table +from rich.panel import Panel +from rich import print as rprint + +from .api import VulnersClient + +console = Console() + + +def get_client(api_key: str | None) -> VulnersClient: + client = VulnersClient(api_key=api_key) + if not client.api_key: + console.print("[yellow]Warning: No API key set. Set VULNERS_API_KEY or use --api-key[/yellow]") + return client + + +def print_error(msg: str): + console.print(f"[red]Error:[/red] {msg}") + sys.exit(1) + + +@click.group(context_settings={"help_option_names": ["-h", "--help"]}) +@click.option("--api-key", envvar="VULNERS_API_KEY", help="Vulners API key") +@click.pass_context +def main(ctx, api_key): + """Query CVEs and CPEs from vulners.com""" + ctx.ensure_object(dict) + ctx.obj["api_key"] = api_key + + +def build_query( + base: str, + type_: str | None, + cvss_min: float | None, + cvss_max: float | None, + after: str | None, + before: str | None, + vendor: str | None, + product: str | None, + version: str | None, + order: str | None, +) -> str: + parts = [base] if base else [] + if type_: + parts.append(f"type:{type_}") + if cvss_min is not None and cvss_max is not None: + parts.append(f"cvss.score:[{cvss_min} TO {cvss_max}]") + elif cvss_min is not None: + parts.append(f"cvss.score:[{cvss_min} TO 10]") + elif cvss_max is not None: + parts.append(f"cvss.score:[0 TO {cvss_max}]") + if after: + parts.append(f"published:[{after} TO *]") + if before: + parts.append(f"published:[* TO {before}]") + if vendor: + parts.append(f"affectedSoftware.vendor:{vendor}") + if product: + parts.append(f"affectedSoftware.name:{product}") + if version: + parts.append(f"affectedSoftware.version:{version}") + query = " AND ".join(parts) if parts else "*" + if order: + query += f" order:{order}" + return query + + +@main.command() +@click.argument("query", default="") +@click.option("-n", "--limit", default=10, help="Number of results") +@click.option("--skip", default=0, help="Skip first N results") +@click.option("-t", "--type", "type_", help="Filter by type (cve, exploit, nessus, etc.)") +@click.option("--cvss-min", type=float, help="Minimum CVSS score") +@click.option("--cvss-max", type=float, help="Maximum CVSS score") +@click.option("--after", help="Published after date (YYYY-MM-DD)") +@click.option("--before", help="Published before date (YYYY-MM-DD)") +@click.option("--vendor", help="Filter by software vendor") +@click.option("--product", help="Filter by product name") +@click.option("--version", help="Filter by version") +@click.option("--order", type=click.Choice(["published", "cvss"]), help="Order results") +@click.option("--full", "-f", is_flag=True, help="Show full descriptions") +@click.option("--json", "as_json", is_flag=True, help="Output as JSON") +@click.pass_context +def search(ctx, query, limit, skip, type_, cvss_min, cvss_max, after, before, vendor, product, version, order, full, as_json): + """Search vulnerabilities using Lucene query syntax""" + client = get_client(ctx.obj["api_key"]) + final_query = build_query(query, type_, cvss_min, cvss_max, after, before, vendor, product, version, order) + try: + result = client.search(final_query, size=limit, skip=skip) + except Exception as e: + print_error(str(e)) + + if as_json: + click.echo(json.dumps(result, indent=2)) + return + + data = result.get("data", {}) + if data.get("total", 0) == 0: + console.print("[yellow]No results found[/yellow]") + return + + console.print(f"[green]Found {data['total']} results[/green]\n") + + for doc in data.get("search", []): + src = doc.get("_source", {}) + title = src.get("title", src.get("id", "Unknown")) + doc_type = src.get("type", "unknown") + cvss = src.get("cvss", {}).get("score", "-") + published = src.get("published", "-")[:10] if src.get("published") else "-" + raw_desc = src.get("description", "") + truncated = not full and len(raw_desc) > 200 + desc = raw_desc if full else raw_desc[:200] + + panel = Panel( + f"{desc}..." if truncated else (desc or "No description"), + title=f"[bold]{title}[/bold]", + subtitle=f"Type: {doc_type} | CVSS: {cvss} | Published: {published}", + border_style="dim", + ) + console.print(panel) + console.print() + + +@main.command() +@click.argument("cve_id") +@click.option("--json", "as_json", is_flag=True, help="Output as JSON") +@click.pass_context +def cve(ctx, cve_id, as_json): + """Get details for a specific CVE (e.g., CVE-2021-44228)""" + client = get_client(ctx.obj["api_key"]) + cve_id = cve_id.upper() + + try: + result = client.get_cve(cve_id) + except Exception as e: + print_error(str(e)) + + if as_json: + click.echo(json.dumps(result, indent=2)) + return + + docs = result.get("data", {}).get("search", []) + if not docs: + console.print(f"[yellow]CVE {cve_id} not found[/yellow]") + return + + src = docs[0].get("_source", {}) + title = src.get("title", cve_id) + cvss = src.get("cvss", {}) + desc = src.get("description", "No description available") + published = src.get("published", "Unknown")[:10] if src.get("published") else "Unknown" + refs = src.get("references", []) + cpes = src.get("cpe", []) + + console.print(Panel(f"[bold cyan]{title}[/bold cyan]", expand=False, border_style="dim")) + console.print(f"\n[bold]CVSS Score:[/bold] {cvss.get('score', 'N/A')} ({cvss.get('vector', 'N/A')})") + console.print(f"[bold]Published:[/bold] {published}") + console.print(f"\n[bold]Description:[/bold]\n{desc}") + + if cpes: + console.print(f"\n[bold]Affected CPEs:[/bold]") + for cpe in cpes[:10]: + console.print(f" • {cpe}") + if len(cpes) > 10: + console.print(f" [dim]... and {len(cpes) - 10} more[/dim]") + + if refs: + console.print(f"\n[bold]References:[/bold]") + for ref in refs[:5]: + console.print(f" • {ref}") + if len(refs) > 5: + console.print(f" [dim]... and {len(refs) - 5} more[/dim]") + + +@main.command() +@click.option("--vendor", "-v", help="CPE vendor name") +@click.option("--product", "-p", help="CPE product name") +@click.option("--version", help="CPE version") +@click.option("-n", "--limit", default=10, help="Number of results") +@click.option("--json", "as_json", is_flag=True, help="Output as JSON") +@click.pass_context +def cpe(ctx, vendor, product, version, limit, as_json): + """Search vulnerabilities by CPE attributes""" + if not any([vendor, product, version]): + print_error("At least one of --vendor, --product, or --version required") + + client = get_client(ctx.obj["api_key"]) + try: + result = client.search_cpe(vendor=vendor, product=product, version=version, size=limit) + except Exception as e: + print_error(str(e)) + + if as_json: + click.echo(json.dumps(result, indent=2)) + return + + data = result.get("data", {}) + total = data.get("total", 0) + if total == 0: + console.print("[yellow]No results found[/yellow]") + return + + console.print(f"[green]Found {total} vulnerabilities[/green]\n") + + table = Table(show_header=True, border_style="dim") + table.add_column("ID", style="cyan") + table.add_column("Title", max_width=50) + table.add_column("CVSS", justify="right") + table.add_column("Published") + + for doc in data.get("search", []): + src = doc.get("_source", {}) + table.add_row( + src.get("id", "-"), + (src.get("title", "-")[:47] + "...") if len(src.get("title", "")) > 50 else src.get("title", "-"), + str(src.get("cvss", {}).get("score", "-")), + src.get("published", "-")[:10] if src.get("published") else "-", + ) + + console.print(table) + + +@main.command() +@click.argument("software") +@click.argument("version", required=False) +@click.option("-n", "--limit", default=20, help="Max vulnerabilities") +@click.option("--json", "as_json", is_flag=True, help="Output as JSON") +@click.pass_context +def software(ctx, software, version, limit, as_json): + """Find vulnerabilities for a software (e.g., apache 2.4.49)""" + client = get_client(ctx.obj["api_key"]) + try: + result = client.software_vulns(software, version, max_vulns=limit) + except Exception as e: + print_error(str(e)) + + if as_json: + click.echo(json.dumps(result, indent=2)) + return + + data = result.get("data", {}) + vulns = data.get("search", []) or data.get("vulnerabilities", []) + + name = f"{software} {version}" if version else software + if not vulns: + console.print(f"[yellow]No vulnerabilities found for {name}[/yellow]") + return + + console.print(f"[green]Found {len(vulns)} vulnerabilities for {name}[/green]\n") + + table = Table(show_header=True, border_style="dim") + table.add_column("CVE", style="cyan") + table.add_column("Title", max_width=50) + table.add_column("CVSS", justify="right") + + for vuln in vulns: + if isinstance(vuln, dict): + src = vuln.get("_source", vuln) + cve_id = src.get("id", "-") + title = src.get("title", "-") + cvss = src.get("cvss", {}).get("score", "-") if isinstance(src.get("cvss"), dict) else "-" + else: + cve_id = str(vuln) + title = "-" + cvss = "-" + + table.add_row( + cve_id, + (title[:47] + "...") if len(title) > 50 else title, + str(cvss), + ) + + console.print(table) + + +if __name__ == "__main__": + main() |
