aboutsummaryrefslogtreecommitdiffstats
path: root/src/nvd/rate_limiter.py
diff options
context:
space:
mode:
authorLouis Burda <dev@sinitax.com>2026-01-30 03:04:01 +0100
committerLouis Burda <dev@sinitax.com>2026-01-30 03:04:01 +0100
commitf6487c615cff023db1574e2c23db78bf02a43709 (patch)
tree8a0e793a8ea28b2a5eef5dcd509b6c6a2466ee1c /src/nvd/rate_limiter.py
downloadnvdb-py-main.tar.gz
nvdb-py-main.zip
Add initial versionHEADmain
Diffstat (limited to 'src/nvd/rate_limiter.py')
-rw-r--r--src/nvd/rate_limiter.py67
1 files changed, 67 insertions, 0 deletions
diff --git a/src/nvd/rate_limiter.py b/src/nvd/rate_limiter.py
new file mode 100644
index 0000000..be0fddf
--- /dev/null
+++ b/src/nvd/rate_limiter.py
@@ -0,0 +1,67 @@
+"""Async rate limiter for NVD API requests."""
+
+import asyncio
+import time
+from collections import deque
+from typing import Deque
+
+
+class RateLimiter:
+ """Sliding window rate limiter for async operations.
+
+ NVD API rate limits:
+ - Without API key: 5 requests per 30 seconds
+ - With API key: 50 requests per 30 seconds
+ """
+
+ def __init__(self, max_requests: int = 5, window_seconds: int = 30) -> None:
+ """Initialize rate limiter.
+
+ Args:
+ max_requests: Maximum number of requests allowed in the time window
+ window_seconds: Time window in seconds
+ """
+ self.max_requests = max_requests
+ self.window_seconds = window_seconds
+ self._requests: Deque[float] = deque()
+ self._lock = asyncio.Lock()
+
+ async def acquire(self) -> None:
+ """Acquire permission to make a request, waiting if necessary."""
+ async with self._lock:
+ now = time.time()
+
+ # Remove requests outside the current window
+ while self._requests and self._requests[0] <= now - self.window_seconds:
+ self._requests.popleft()
+
+ # If at limit, wait until oldest request expires
+ if len(self._requests) >= self.max_requests:
+ sleep_time = self._requests[0] + self.window_seconds - now
+ if sleep_time > 0:
+ await asyncio.sleep(sleep_time)
+ # Recursively try again after waiting
+ return await self.acquire()
+
+ # Record this request
+ self._requests.append(now)
+
+ def configure(self, has_api_key: bool) -> None:
+ """Configure rate limiter based on whether an API key is present.
+
+ Args:
+ has_api_key: Whether an API key is being used
+ """
+ if has_api_key:
+ self.max_requests = 50
+ else:
+ self.max_requests = 5
+
+ async def __aenter__(self) -> "RateLimiter":
+ """Async context manager entry."""
+ await self.acquire()
+ return self
+
+ async def __aexit__(self, *args: object) -> None:
+ """Async context manager exit."""
+ pass