aboutsummaryrefslogtreecommitdiffstats
path: root/src/nvd/rate_limiter.py
blob: be0fddf0b50b860e2cac3d102e325a196abdc8ba (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
"""Async rate limiter for NVD API requests."""

import asyncio
import time
from collections import deque
from typing import Deque


class RateLimiter:
    """Sliding window rate limiter for async operations.

    NVD API rate limits:
    - Without API key: 5 requests per 30 seconds
    - With API key: 50 requests per 30 seconds
    """

    def __init__(self, max_requests: int = 5, window_seconds: int = 30) -> None:
        """Initialize rate limiter.

        Args:
            max_requests: Maximum number of requests allowed in the time window
            window_seconds: Time window in seconds
        """
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self._requests: Deque[float] = deque()
        self._lock = asyncio.Lock()

    async def acquire(self) -> None:
        """Acquire permission to make a request, waiting if necessary."""
        async with self._lock:
            now = time.time()

            # Remove requests outside the current window
            while self._requests and self._requests[0] <= now - self.window_seconds:
                self._requests.popleft()

            # If at limit, wait until oldest request expires
            if len(self._requests) >= self.max_requests:
                sleep_time = self._requests[0] + self.window_seconds - now
                if sleep_time > 0:
                    await asyncio.sleep(sleep_time)
                    # Recursively try again after waiting
                    return await self.acquire()

            # Record this request
            self._requests.append(now)

    def configure(self, has_api_key: bool) -> None:
        """Configure rate limiter based on whether an API key is present.

        Args:
            has_api_key: Whether an API key is being used
        """
        if has_api_key:
            self.max_requests = 50
        else:
            self.max_requests = 5

    async def __aenter__(self) -> "RateLimiter":
        """Async context manager entry."""
        await self.acquire()
        return self

    async def __aexit__(self, *args: object) -> None:
        """Async context manager exit."""
        pass