diff options
| author | Claude <claude@anthropic.com> | 2026-03-04 19:14:55 +0100 |
|---|---|---|
| committer | Claude <claude@anthropic.com> | 2026-03-04 19:14:55 +0100 |
| commit | 171c5b86ef05974426ba5c5d8547c8025977d1a2 (patch) | |
| tree | 2a1193e2bb81a6341e55d0b883a3fc33f77f8be1 /src/gemini/auth.py | |
| parent | 9f14edf2b97286e02830d528038b32d5b31aaa0a (diff) | |
| parent | 0278c87f062a9ae7d617b92be22b175558a05086 (diff) | |
| download | gemini-py-main.tar.gz gemini-py-main.zip | |
Diffstat (limited to 'src/gemini/auth.py')
| -rw-r--r-- | src/gemini/auth.py | 91 |
1 files changed, 91 insertions, 0 deletions
diff --git a/src/gemini/auth.py b/src/gemini/auth.py new file mode 100644 index 0000000..bfbd85b --- /dev/null +++ b/src/gemini/auth.py @@ -0,0 +1,91 @@ +import json +import os +import time +from pathlib import Path + +import httpx + +OAUTH_CLIENT_ID = "681255809395-oo8ft2oprdrnp9e3aqf6av3hmdib135j.apps.googleusercontent.com" +OAUTH_CLIENT_SECRET = "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" +TOKEN_URL = "https://oauth2.googleapis.com/token" +DEFAULT_CREDS_PATH = Path.home() / ".gemini" / "oauth_creds.json" + + +class OAuthCredentials: + """ + Resolves credentials in priority order: + 1. Environment variables (GEMINI_REFRESH_TOKEN, GEMINI_ACCESS_TOKEN, GEMINI_TOKEN_EXPIRY) + 2. Credentials file (path argument or ~/.gemini/oauth_creds.json) + """ + + def __init__(self, creds_path: str | None = None): + self._path = Path(creds_path) if creds_path else DEFAULT_CREDS_PATH + self._data: dict = {} + self._from_env = False + self._load() + + def _load(self): + refresh = os.environ.get("GEMINI_REFRESH_TOKEN") + if refresh: + self._from_env = True + self._data = { + "refresh_token": refresh, + "access_token": os.environ.get("GEMINI_ACCESS_TOKEN", ""), + "expiry_date": int(os.environ.get("GEMINI_TOKEN_EXPIRY", "0")), + } + return + + if self._path.exists(): + self._data = json.loads(self._path.read_text()) + + def _save(self): + # Don't write back when credentials came from env vars + if self._from_env: + return + self._path.parent.mkdir(parents=True, exist_ok=True) + self._path.write_text(json.dumps(self._data, indent=2)) + os.chmod(self._path, 0o600) + + @property + def refresh_token(self) -> str | None: + return self._data.get("refresh_token") + + @property + def access_token(self) -> str | None: + return self._data.get("access_token") + + @property + def expiry_ms(self) -> int: + return self._data.get("expiry_date", 0) + + def is_expired(self) -> bool: + if not self.access_token: + return True + return time.time() * 1000 >= (self.expiry_ms - 300_000) + + async def get_valid_token(self, client: httpx.AsyncClient) -> str: + if self.is_expired(): + await self._refresh(client) + return self.access_token or "" + + async def _refresh(self, client: httpx.AsyncClient): + if not self.refresh_token: + raise ValueError( + "No refresh token found. Set GEMINI_REFRESH_TOKEN or provide a credentials file.\n" + "Obtain it by running: gemini (and completing the login flow)" + ) + resp = await client.post( + TOKEN_URL, + data={ + "refresh_token": self.refresh_token, + "client_id": OAUTH_CLIENT_ID, + "client_secret": OAUTH_CLIENT_SECRET, + "grant_type": "refresh_token", + }, + ) + resp.raise_for_status() + tokens = resp.json() + self._data.update(tokens) + if "expires_in" in tokens: + self._data["expiry_date"] = int((time.time() + tokens["expires_in"]) * 1000) + self._save() |
