import hashlib import time from pathlib import Path import httpx from tmview import cache as _cache BASE_URL = "https://www.tmdn.org/tmview/api" OFFICE_NAMES = { "EM": "EUIPO", "WO": "WIPO (Intl)", "AT": "Austria", "BG": "Bulgaria", "BX": "Benelux", "CY": "Cyprus", "CZ": "Czechia", "DE": "Germany", "DK": "Denmark", "EE": "Estonia", "ES": "Spain", "FI": "Finland", "FR": "France", "GR": "Greece", "HR": "Croatia", "HU": "Hungary", "IE": "Ireland", "IT": "Italy", "LT": "Lithuania", "LV": "Latvia", "MT": "Malta", "PL": "Poland", "PT": "Portugal", "RO": "Romania", "SE": "Sweden", "SI": "Slovenia", "SK": "Slovakia", } # WO (WIPO) is included: Madrid Protocol international registrations that # designate EU member states are filed at WO, not at national/EUIPO offices. EU_OFFICES = list(OFFICE_NAMES.keys()) RETRY_DELAYS = [2, 4, 8] RETRYABLE_CODES = {503, 504} _HEADERS = { "Content-Type": "application/json", "Accept": "application/json", "Accept-Language": "en", "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/121.0 Safari/537.36", "Referer": "https://www.tmdn.org/tmview/", "Origin": "https://www.tmdn.org", } _UPLOAD_HEADERS = { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/121.0 Safari/537.36", "Referer": "https://www.tmdn.org/tmview/", "Origin": "https://www.tmdn.org", "Accept": "application/json", } def _build_payload(query, offices, limit, page, classes, status): payload = { "basicSearch": query, "criteria": "C", "page": page, "pageSize": limit, "offices": offices, } if classes: payload["niceClass"] = [int(c) for c in classes] if status: status_map = { "registered": "Registered", "pending": "Filed", "expired": "Expired", } mapped = status_map.get(status.lower()) if mapped: payload["tmStatus"] = [mapped] return payload def _parse_trademark(tm): office_code = tm.get("tmOffice", "") applicants = tm.get("applicantName") or [] owner = applicants[0] if applicants else "" classes = tm.get("niceClass") or [] app_date = tm.get("applicationDate", "") if app_date and len(app_date) >= 10: app_date = app_date[:10] return { "office": office_code, "office_name": OFFICE_NAMES.get(office_code, office_code), "name": tm.get("tmName", ""), "status": tm.get("tradeMarkStatus", ""), "type": tm.get("tradeMarkType", ""), "owner": owner, "classes": classes, "application_date": app_date, "application_number": tm.get("applicationNumber", ""), "registration_date": (tm.get("registrationDate") or "")[:10], "st13": tm.get("ST13", ""), "image_url": tm.get("detailImageURI") or tm.get("markImageURI") or "", } def search(query, offices=None, limit=20, page=1, classes=None, status=None): if offices is None: offices = EU_OFFICES payload = _build_payload(query, offices, limit, page, classes, status) key = _cache.cache_key(payload) cached = _cache.load(key) if cached is not None: result, fetched_at = cached result["fetched_at"] = fetched_at result["from_cache"] = True return result delays = RETRY_DELAYS[:] attempt = 0 while True: try: with httpx.Client(timeout=30) as client: resp = client.post( f"{BASE_URL}/search/results", params={"translate": "true"}, json=payload, headers=_HEADERS, ) if resp.status_code in RETRYABLE_CODES and delays: time.sleep(delays.pop(0)) attempt += 1 continue resp.raise_for_status() data = resp.json() trademarks_raw = data.get("tradeMarks") or [] total = data.get("totalResults") or data.get("total") or len(trademarks_raw) trademarks = [_parse_trademark(tm) for tm in trademarks_raw] fetched_at = _cache.now_iso() result = {"trademarks": trademarks, "total": total, "page": page} _cache.save(key, result, fetched_at) result["fetched_at"] = fetched_at result["from_cache"] = False return result except httpx.HTTPStatusError as exc: raise RuntimeError( f"HTTP {exc.response.status_code} after {attempt + 1} attempt(s): " f"{exc.response.text[:200]}" ) from exc except (httpx.TimeoutException, httpx.ConnectError, httpx.RemoteProtocolError) as exc: if delays: time.sleep(delays.pop(0)) attempt += 1 continue raise RuntimeError( f"Connection failed after {attempt + 1} attempt(s): {exc}" ) from exc def _upload_image(image_path: str) -> dict: path = Path(image_path) with httpx.Client(timeout=60, headers=_UPLOAD_HEADERS) as client: with path.open("rb") as fh: resp = client.post( f"{BASE_URL}/imageSearch/tm/uploadAndSegments", files={"file": (path.name, fh, "image/jpeg")}, data={"clienttype": "desktop"}, ) try: resp.raise_for_status() except httpx.HTTPStatusError as exc: raise RuntimeError( f"Image upload failed (HTTP {exc.response.status_code}): {exc.response.text[:200]}" ) from exc data = resp.json() seg = next((s for s in data.get("segments", []) if s.get("isSelected")), None) if seg is None and data.get("segments"): seg = data["segments"][0] if seg is None: raise RuntimeError("Image upload succeeded but returned no segments") return { "imageId": data["imageId"], "imageName": data["imageName"], "segmentLeft": str(seg["left"]), "segmentRight": str(seg["right"]), "segmentTop": str(seg["upper"]), "segmentBottom": str(seg["lower"]), } def search_by_image(image_path: str, offices=None, limit=20, page=1): if offices is None: offices = EU_OFFICES image_bytes = Path(image_path).read_bytes() image_hash = hashlib.sha256(image_bytes).hexdigest() cache_params = {"image_sha256": image_hash, "offices": offices, "limit": limit, "page": page} key = _cache.cache_key(cache_params) cached = _cache.load(key) if cached is not None: result, fetched_at = cached result["fetched_at"] = fetched_at result["from_cache"] = True return result img = _upload_image(image_path) payload = { "imageId": img["imageId"], "imageName": img["imageName"], "segmentLeft": img["segmentLeft"], "segmentRight": img["segmentRight"], "segmentTop": img["segmentTop"], "segmentBottom": img["segmentBottom"], "colour": "false", "criteria": "C", "imageSearch": True, "offices": offices, "page": page, "pageSize": limit, } delays = RETRY_DELAYS[:] attempt = 0 while True: try: with httpx.Client(timeout=30) as client: resp = client.post( f"{BASE_URL}/search/results", params={"translate": "true"}, json=payload, headers=_HEADERS, ) if resp.status_code in RETRYABLE_CODES and delays: time.sleep(delays.pop(0)) attempt += 1 continue resp.raise_for_status() data = resp.json() trademarks_raw = data.get("tradeMarks") or [] total = data.get("totalResults") or data.get("total") or len(trademarks_raw) trademarks = [_parse_trademark(tm) for tm in trademarks_raw] fetched_at = _cache.now_iso() result = {"trademarks": trademarks, "total": total, "page": page} _cache.save(key, result, fetched_at) result["fetched_at"] = fetched_at result["from_cache"] = False return result except httpx.HTTPStatusError as exc: raise RuntimeError( f"HTTP {exc.response.status_code} after {attempt + 1} attempt(s): " f"{exc.response.text[:200]}" ) from exc except (httpx.TimeoutException, httpx.ConnectError, httpx.RemoteProtocolError) as exc: if delays: time.sleep(delays.pop(0)) attempt += 1 continue raise RuntimeError( f"Connection failed after {attempt + 1} attempt(s): {exc}" ) from exc