blob: be0fddf0b50b860e2cac3d102e325a196abdc8ba (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
"""Async rate limiter for NVD API requests."""
import asyncio
import time
from collections import deque
from typing import Deque
class RateLimiter:
"""Sliding window rate limiter for async operations.
NVD API rate limits:
- Without API key: 5 requests per 30 seconds
- With API key: 50 requests per 30 seconds
"""
def __init__(self, max_requests: int = 5, window_seconds: int = 30) -> None:
"""Initialize rate limiter.
Args:
max_requests: Maximum number of requests allowed in the time window
window_seconds: Time window in seconds
"""
self.max_requests = max_requests
self.window_seconds = window_seconds
self._requests: Deque[float] = deque()
self._lock = asyncio.Lock()
async def acquire(self) -> None:
"""Acquire permission to make a request, waiting if necessary."""
async with self._lock:
now = time.time()
# Remove requests outside the current window
while self._requests and self._requests[0] <= now - self.window_seconds:
self._requests.popleft()
# If at limit, wait until oldest request expires
if len(self._requests) >= self.max_requests:
sleep_time = self._requests[0] + self.window_seconds - now
if sleep_time > 0:
await asyncio.sleep(sleep_time)
# Recursively try again after waiting
return await self.acquire()
# Record this request
self._requests.append(now)
def configure(self, has_api_key: bool) -> None:
"""Configure rate limiter based on whether an API key is present.
Args:
has_api_key: Whether an API key is being used
"""
if has_api_key:
self.max_requests = 50
else:
self.max_requests = 5
async def __aenter__(self) -> "RateLimiter":
"""Async context manager entry."""
await self.acquire()
return self
async def __aexit__(self, *args: object) -> None:
"""Async context manager exit."""
pass
|