import httpx from cvedb.models import CVE, CVEWithCPEs BASE_URL = "https://cvedb.shodan.io" class CVEDBClient: def __init__(self, timeout: float = 30.0): self._client = httpx.Client(base_url=BASE_URL, timeout=timeout) def __enter__(self): return self def __exit__(self, *args): self._client.close() def get_cve(self, cve_id: str) -> CVEWithCPEs: resp = self._client.get(f"/cve/{cve_id}") resp.raise_for_status() return CVEWithCPEs.model_validate(resp.json()) def get_cpes( self, product: str, skip: int = 0, limit: int | None = 1000, count: bool = False, ) -> list[str] | int: params: dict = {"product": product, "skip": skip, "count": count} if limit is not None: params["limit"] = limit resp = self._client.get("/cpes", params=params) resp.raise_for_status() data = resp.json() if count: return data.get("total", 0) return data.get("cpes", []) def get_cves( self, cpe23: str | None = None, product: str | None = None, skip: int = 0, limit: int | None = 1000, count: bool = False, is_kev: bool = False, sort_by_epss: bool = False, start_date: str | None = None, end_date: str | None = None, ) -> list[CVE] | int: params: dict = {"skip": skip, "count": count} if limit is not None: params["limit"] = limit if cpe23: params["cpe23"] = cpe23 if product: params["product"] = product if is_kev: params["is_kev"] = True if sort_by_epss: params["sort_by_epss"] = True if start_date: params["start_date"] = start_date if end_date: params["end_date"] = end_date resp = self._client.get("/cves", params=params) resp.raise_for_status() data = resp.json() if count: return data.get("total", 0) return [CVE.model_validate(c) for c in data.get("cves", [])]