aboutsummaryrefslogtreecommitdiffstats
path: root/src/vulners
diff options
context:
space:
mode:
Diffstat (limited to 'src/vulners')
-rw-r--r--src/vulners/__init__.py3
-rw-r--r--src/vulners/api.py74
-rw-r--r--src/vulners/cli.py282
3 files changed, 359 insertions, 0 deletions
diff --git a/src/vulners/__init__.py b/src/vulners/__init__.py
new file mode 100644
index 0000000..eee1892
--- /dev/null
+++ b/src/vulners/__init__.py
@@ -0,0 +1,3 @@
+"""Vulners CLI - Query CVEs and CPEs from vulners.com"""
+
+__version__ = "0.1.0"
diff --git a/src/vulners/api.py b/src/vulners/api.py
new file mode 100644
index 0000000..2c0fdbd
--- /dev/null
+++ b/src/vulners/api.py
@@ -0,0 +1,74 @@
+import os
+from dataclasses import dataclass
+from typing import Any
+
+import httpx
+
+BASE_URL = "https://vulners.com/api/v3"
+
+
+@dataclass
+class VulnersClient:
+ api_key: str | None = None
+ timeout: float = 30.0
+
+ def __post_init__(self):
+ if not self.api_key:
+ self.api_key = os.environ.get("VULNERS_API_KEY")
+
+ def _request(self, endpoint: str, payload: dict[str, Any]) -> dict[str, Any]:
+ headers = {"Content-Type": "application/json"}
+ if self.api_key:
+ headers["X-Api-Key"] = self.api_key
+
+ with httpx.Client(timeout=self.timeout) as client:
+ resp = client.post(f"{BASE_URL}{endpoint}", headers=headers, json=payload)
+ resp.raise_for_status()
+ return resp.json()
+
+ def search(
+ self,
+ query: str,
+ size: int = 10,
+ skip: int = 0,
+ fields: list[str] | None = None,
+ ) -> dict[str, Any]:
+ payload = {"query": query, "size": size, "skip": skip}
+ if fields:
+ payload["fields"] = fields
+ return self._request("/search/lucene/", payload)
+
+ def get_cve(self, cve_id: str) -> dict[str, Any]:
+ return self.search(f"id:{cve_id}", size=1)
+
+ def search_cpe(
+ self,
+ vendor: str | None = None,
+ product: str | None = None,
+ version: str | None = None,
+ size: int = 10,
+ ) -> dict[str, Any]:
+ parts = []
+ if vendor:
+ parts.append(f"cpe.vendor:{vendor}")
+ if product:
+ parts.append(f"cpe.product:{product}")
+ if version:
+ parts.append(f"cpe.version:{version}")
+ query = " AND ".join(parts) if parts else "*"
+ return self.search(query, size=size)
+
+ def software_vulns(
+ self,
+ software: str,
+ version: str | None = None,
+ max_vulns: int = 20,
+ ) -> dict[str, Any]:
+ payload = {
+ "software": software,
+ "type": "software",
+ "maxVulnerabilities": max_vulns,
+ }
+ if version:
+ payload["version"] = version
+ return self._request("/burp/softwareapi/", payload)
diff --git a/src/vulners/cli.py b/src/vulners/cli.py
new file mode 100644
index 0000000..a5d954a
--- /dev/null
+++ b/src/vulners/cli.py
@@ -0,0 +1,282 @@
+import json
+import sys
+
+import click
+from rich.console import Console
+from rich.table import Table
+from rich.panel import Panel
+from rich import print as rprint
+
+from .api import VulnersClient
+
+console = Console()
+
+
+def get_client(api_key: str | None) -> VulnersClient:
+ client = VulnersClient(api_key=api_key)
+ if not client.api_key:
+ console.print("[yellow]Warning: No API key set. Set VULNERS_API_KEY or use --api-key[/yellow]")
+ return client
+
+
+def print_error(msg: str):
+ console.print(f"[red]Error:[/red] {msg}")
+ sys.exit(1)
+
+
+@click.group(context_settings={"help_option_names": ["-h", "--help"]})
+@click.option("--api-key", envvar="VULNERS_API_KEY", help="Vulners API key")
+@click.pass_context
+def main(ctx, api_key):
+ """Query CVEs and CPEs from vulners.com"""
+ ctx.ensure_object(dict)
+ ctx.obj["api_key"] = api_key
+
+
+def build_query(
+ base: str,
+ type_: str | None,
+ cvss_min: float | None,
+ cvss_max: float | None,
+ after: str | None,
+ before: str | None,
+ vendor: str | None,
+ product: str | None,
+ version: str | None,
+ order: str | None,
+) -> str:
+ parts = [base] if base else []
+ if type_:
+ parts.append(f"type:{type_}")
+ if cvss_min is not None and cvss_max is not None:
+ parts.append(f"cvss.score:[{cvss_min} TO {cvss_max}]")
+ elif cvss_min is not None:
+ parts.append(f"cvss.score:[{cvss_min} TO 10]")
+ elif cvss_max is not None:
+ parts.append(f"cvss.score:[0 TO {cvss_max}]")
+ if after:
+ parts.append(f"published:[{after} TO *]")
+ if before:
+ parts.append(f"published:[* TO {before}]")
+ if vendor:
+ parts.append(f"affectedSoftware.vendor:{vendor}")
+ if product:
+ parts.append(f"affectedSoftware.name:{product}")
+ if version:
+ parts.append(f"affectedSoftware.version:{version}")
+ query = " AND ".join(parts) if parts else "*"
+ if order:
+ query += f" order:{order}"
+ return query
+
+
+@main.command()
+@click.argument("query", default="")
+@click.option("-n", "--limit", default=10, help="Number of results")
+@click.option("--skip", default=0, help="Skip first N results")
+@click.option("-t", "--type", "type_", help="Filter by type (cve, exploit, nessus, etc.)")
+@click.option("--cvss-min", type=float, help="Minimum CVSS score")
+@click.option("--cvss-max", type=float, help="Maximum CVSS score")
+@click.option("--after", help="Published after date (YYYY-MM-DD)")
+@click.option("--before", help="Published before date (YYYY-MM-DD)")
+@click.option("--vendor", help="Filter by software vendor")
+@click.option("--product", help="Filter by product name")
+@click.option("--version", help="Filter by version")
+@click.option("--order", type=click.Choice(["published", "cvss"]), help="Order results")
+@click.option("--full", "-f", is_flag=True, help="Show full descriptions")
+@click.option("--json", "as_json", is_flag=True, help="Output as JSON")
+@click.pass_context
+def search(ctx, query, limit, skip, type_, cvss_min, cvss_max, after, before, vendor, product, version, order, full, as_json):
+ """Search vulnerabilities using Lucene query syntax"""
+ client = get_client(ctx.obj["api_key"])
+ final_query = build_query(query, type_, cvss_min, cvss_max, after, before, vendor, product, version, order)
+ try:
+ result = client.search(final_query, size=limit, skip=skip)
+ except Exception as e:
+ print_error(str(e))
+
+ if as_json:
+ click.echo(json.dumps(result, indent=2))
+ return
+
+ data = result.get("data", {})
+ if data.get("total", 0) == 0:
+ console.print("[yellow]No results found[/yellow]")
+ return
+
+ console.print(f"[green]Found {data['total']} results[/green]\n")
+
+ for doc in data.get("search", []):
+ src = doc.get("_source", {})
+ title = src.get("title", src.get("id", "Unknown"))
+ doc_type = src.get("type", "unknown")
+ cvss = src.get("cvss", {}).get("score", "-")
+ published = src.get("published", "-")[:10] if src.get("published") else "-"
+ raw_desc = src.get("description", "")
+ truncated = not full and len(raw_desc) > 200
+ desc = raw_desc if full else raw_desc[:200]
+
+ panel = Panel(
+ f"{desc}..." if truncated else (desc or "No description"),
+ title=f"[bold]{title}[/bold]",
+ subtitle=f"Type: {doc_type} | CVSS: {cvss} | Published: {published}",
+ border_style="dim",
+ )
+ console.print(panel)
+ console.print()
+
+
+@main.command()
+@click.argument("cve_id")
+@click.option("--json", "as_json", is_flag=True, help="Output as JSON")
+@click.pass_context
+def cve(ctx, cve_id, as_json):
+ """Get details for a specific CVE (e.g., CVE-2021-44228)"""
+ client = get_client(ctx.obj["api_key"])
+ cve_id = cve_id.upper()
+
+ try:
+ result = client.get_cve(cve_id)
+ except Exception as e:
+ print_error(str(e))
+
+ if as_json:
+ click.echo(json.dumps(result, indent=2))
+ return
+
+ docs = result.get("data", {}).get("search", [])
+ if not docs:
+ console.print(f"[yellow]CVE {cve_id} not found[/yellow]")
+ return
+
+ src = docs[0].get("_source", {})
+ title = src.get("title", cve_id)
+ cvss = src.get("cvss", {})
+ desc = src.get("description", "No description available")
+ published = src.get("published", "Unknown")[:10] if src.get("published") else "Unknown"
+ refs = src.get("references", [])
+ cpes = src.get("cpe", [])
+
+ console.print(Panel(f"[bold cyan]{title}[/bold cyan]", expand=False, border_style="dim"))
+ console.print(f"\n[bold]CVSS Score:[/bold] {cvss.get('score', 'N/A')} ({cvss.get('vector', 'N/A')})")
+ console.print(f"[bold]Published:[/bold] {published}")
+ console.print(f"\n[bold]Description:[/bold]\n{desc}")
+
+ if cpes:
+ console.print(f"\n[bold]Affected CPEs:[/bold]")
+ for cpe in cpes[:10]:
+ console.print(f" • {cpe}")
+ if len(cpes) > 10:
+ console.print(f" [dim]... and {len(cpes) - 10} more[/dim]")
+
+ if refs:
+ console.print(f"\n[bold]References:[/bold]")
+ for ref in refs[:5]:
+ console.print(f" • {ref}")
+ if len(refs) > 5:
+ console.print(f" [dim]... and {len(refs) - 5} more[/dim]")
+
+
+@main.command()
+@click.option("--vendor", "-v", help="CPE vendor name")
+@click.option("--product", "-p", help="CPE product name")
+@click.option("--version", help="CPE version")
+@click.option("-n", "--limit", default=10, help="Number of results")
+@click.option("--json", "as_json", is_flag=True, help="Output as JSON")
+@click.pass_context
+def cpe(ctx, vendor, product, version, limit, as_json):
+ """Search vulnerabilities by CPE attributes"""
+ if not any([vendor, product, version]):
+ print_error("At least one of --vendor, --product, or --version required")
+
+ client = get_client(ctx.obj["api_key"])
+ try:
+ result = client.search_cpe(vendor=vendor, product=product, version=version, size=limit)
+ except Exception as e:
+ print_error(str(e))
+
+ if as_json:
+ click.echo(json.dumps(result, indent=2))
+ return
+
+ data = result.get("data", {})
+ total = data.get("total", 0)
+ if total == 0:
+ console.print("[yellow]No results found[/yellow]")
+ return
+
+ console.print(f"[green]Found {total} vulnerabilities[/green]\n")
+
+ table = Table(show_header=True, border_style="dim")
+ table.add_column("ID", style="cyan")
+ table.add_column("Title", max_width=50)
+ table.add_column("CVSS", justify="right")
+ table.add_column("Published")
+
+ for doc in data.get("search", []):
+ src = doc.get("_source", {})
+ table.add_row(
+ src.get("id", "-"),
+ (src.get("title", "-")[:47] + "...") if len(src.get("title", "")) > 50 else src.get("title", "-"),
+ str(src.get("cvss", {}).get("score", "-")),
+ src.get("published", "-")[:10] if src.get("published") else "-",
+ )
+
+ console.print(table)
+
+
+@main.command()
+@click.argument("software")
+@click.argument("version", required=False)
+@click.option("-n", "--limit", default=20, help="Max vulnerabilities")
+@click.option("--json", "as_json", is_flag=True, help="Output as JSON")
+@click.pass_context
+def software(ctx, software, version, limit, as_json):
+ """Find vulnerabilities for a software (e.g., apache 2.4.49)"""
+ client = get_client(ctx.obj["api_key"])
+ try:
+ result = client.software_vulns(software, version, max_vulns=limit)
+ except Exception as e:
+ print_error(str(e))
+
+ if as_json:
+ click.echo(json.dumps(result, indent=2))
+ return
+
+ data = result.get("data", {})
+ vulns = data.get("search", []) or data.get("vulnerabilities", [])
+
+ name = f"{software} {version}" if version else software
+ if not vulns:
+ console.print(f"[yellow]No vulnerabilities found for {name}[/yellow]")
+ return
+
+ console.print(f"[green]Found {len(vulns)} vulnerabilities for {name}[/green]\n")
+
+ table = Table(show_header=True, border_style="dim")
+ table.add_column("CVE", style="cyan")
+ table.add_column("Title", max_width=50)
+ table.add_column("CVSS", justify="right")
+
+ for vuln in vulns:
+ if isinstance(vuln, dict):
+ src = vuln.get("_source", vuln)
+ cve_id = src.get("id", "-")
+ title = src.get("title", "-")
+ cvss = src.get("cvss", {}).get("score", "-") if isinstance(src.get("cvss"), dict) else "-"
+ else:
+ cve_id = str(vuln)
+ title = "-"
+ cvss = "-"
+
+ table.add_row(
+ cve_id,
+ (title[:47] + "...") if len(title) > 50 else title,
+ str(cvss),
+ )
+
+ console.print(table)
+
+
+if __name__ == "__main__":
+ main()