diff options
Diffstat (limited to 'src/nvd/rate_limiter.py')
| -rw-r--r-- | src/nvd/rate_limiter.py | 67 |
1 files changed, 67 insertions, 0 deletions
diff --git a/src/nvd/rate_limiter.py b/src/nvd/rate_limiter.py new file mode 100644 index 0000000..be0fddf --- /dev/null +++ b/src/nvd/rate_limiter.py @@ -0,0 +1,67 @@ +"""Async rate limiter for NVD API requests.""" + +import asyncio +import time +from collections import deque +from typing import Deque + + +class RateLimiter: + """Sliding window rate limiter for async operations. + + NVD API rate limits: + - Without API key: 5 requests per 30 seconds + - With API key: 50 requests per 30 seconds + """ + + def __init__(self, max_requests: int = 5, window_seconds: int = 30) -> None: + """Initialize rate limiter. + + Args: + max_requests: Maximum number of requests allowed in the time window + window_seconds: Time window in seconds + """ + self.max_requests = max_requests + self.window_seconds = window_seconds + self._requests: Deque[float] = deque() + self._lock = asyncio.Lock() + + async def acquire(self) -> None: + """Acquire permission to make a request, waiting if necessary.""" + async with self._lock: + now = time.time() + + # Remove requests outside the current window + while self._requests and self._requests[0] <= now - self.window_seconds: + self._requests.popleft() + + # If at limit, wait until oldest request expires + if len(self._requests) >= self.max_requests: + sleep_time = self._requests[0] + self.window_seconds - now + if sleep_time > 0: + await asyncio.sleep(sleep_time) + # Recursively try again after waiting + return await self.acquire() + + # Record this request + self._requests.append(now) + + def configure(self, has_api_key: bool) -> None: + """Configure rate limiter based on whether an API key is present. + + Args: + has_api_key: Whether an API key is being used + """ + if has_api_key: + self.max_requests = 50 + else: + self.max_requests = 5 + + async def __aenter__(self) -> "RateLimiter": + """Async context manager entry.""" + await self.acquire() + return self + + async def __aexit__(self, *args: object) -> None: + """Async context manager exit.""" + pass |
