"""Async rate limiter for NVD API requests.""" import asyncio import time from collections import deque from typing import Deque class RateLimiter: """Sliding window rate limiter for async operations. NVD API rate limits: - Without API key: 5 requests per 30 seconds - With API key: 50 requests per 30 seconds """ def __init__(self, max_requests: int = 5, window_seconds: int = 30) -> None: """Initialize rate limiter. Args: max_requests: Maximum number of requests allowed in the time window window_seconds: Time window in seconds """ self.max_requests = max_requests self.window_seconds = window_seconds self._requests: Deque[float] = deque() self._lock = asyncio.Lock() async def acquire(self) -> None: """Acquire permission to make a request, waiting if necessary.""" async with self._lock: now = time.time() # Remove requests outside the current window while self._requests and self._requests[0] <= now - self.window_seconds: self._requests.popleft() # If at limit, wait until oldest request expires if len(self._requests) >= self.max_requests: sleep_time = self._requests[0] + self.window_seconds - now if sleep_time > 0: await asyncio.sleep(sleep_time) # Recursively try again after waiting return await self.acquire() # Record this request self._requests.append(now) def configure(self, has_api_key: bool) -> None: """Configure rate limiter based on whether an API key is present. Args: has_api_key: Whether an API key is being used """ if has_api_key: self.max_requests = 50 else: self.max_requests = 5 async def __aenter__(self) -> "RateLimiter": """Async context manager entry.""" await self.acquire() return self async def __aexit__(self, *args: object) -> None: """Async context manager exit.""" pass