from __future__ import annotations import asyncio import json import logging import os import time from functools import lru_cache from pathlib import Path from typing import List, Optional from urllib.parse import quote_plus import httpx import structlog from asgi_correlation_id import CorrelationIdMiddleware from asgi_correlation_id.context import correlation_id from fastapi import FastAPI, Query, Response from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field from selectolax.parser import HTMLParser from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential class Definition(BaseModel): source: str title: str url: str definition: str class TaxonomyMatch(BaseModel): category: str class_name: str class_code: str type_description: Optional[str] = None type_code: Optional[str] = None annex: Optional[str] = None full_name: str class DefinitionResponse(BaseModel): term: str results: List[Definition] request_id: Optional[str] = None taxonomy: List[TaxonomyMatch] = Field(default_factory=list) app = FastAPI(title="TermSearch API", version="0.1.0") logging.basicConfig(format="%(message)s", level=logging.INFO) structlog.configure( processors=[ structlog.processors.TimeStamper(fmt="iso"), structlog.processors.add_log_level, structlog.processors.EventRenamer("event"), structlog.processors.JSONRenderer(), ] ) logger = structlog.get_logger("termsearch") app.add_middleware( CorrelationIdMiddleware, header_name="X-Request-ID", ) frontend_origin = os.getenv("FRONTEND_ORIGIN", "http://localhost:5173") allowed_origins = [origin.strip() for origin in frontend_origin.split(",") if origin.strip()] app.add_middleware( CORSMiddleware, allow_origins=allowed_origins, allow_credentials=True, allow_methods=["*"] , allow_headers=["*"], ) def normalize_text(text: str) -> str: return " ".join(text.lower().split()) @lru_cache(maxsize=1) def load_taxonomy() -> dict: root_dir = Path(__file__).resolve().parents[1] tax_path = root_dir / "data" / "iso-14244-tax.json" if not tax_path.exists(): tax_path = Path("/data/iso-14244-tax.json") with tax_path.open("r", encoding="utf-8") as handle: return json.load(handle) def find_taxonomy_matches(term: str) -> List[TaxonomyMatch]: normalized_term = normalize_text(term) term_tokens = normalized_term.split() if not term_tokens: return [] data = load_taxonomy() taxonomy = data.get("taxonomy", {}) if isinstance(data, dict) else {} categories = taxonomy.get("categories", {}) if isinstance(taxonomy, dict) else {} matches: List[TaxonomyMatch] = [] for category_name, category in categories.items(): classes = category.get("classes", {}) if isinstance(category, dict) else {} for class_code, class_info in classes.items(): class_name = class_info.get("name", "") annex = class_info.get("annex") types = class_info.get("types", {}) if isinstance(class_info, dict) else {} for type_code, type_info in types.items(): type_description = type_info.get("description", "") full_name = f"{type_description} {class_name}".strip() full_name_normalized = normalize_text(full_name) if all(token in full_name_normalized for token in term_tokens): matches.append( TaxonomyMatch( category=category_name, class_name=class_name, class_code=class_code, type_description=type_description, type_code=type_code, annex=annex, full_name=full_name, ) ) class_name_normalized = normalize_text(class_name) if class_name and all(token in class_name_normalized for token in term_tokens): matches.append( TaxonomyMatch( category=category_name, class_name=class_name, class_code=class_code, annex=annex, full_name=class_name, ) ) return matches @retry( retry=retry_if_exception_type((httpx.RequestError, httpx.TimeoutException)), stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=4), ) async def fetch_text(client: httpx.AsyncClient, url: str) -> str: response = await client.get(url) response.raise_for_status() return response.text @retry( retry=retry_if_exception_type((httpx.RequestError, httpx.TimeoutException)), stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=4), ) async def fetch_json( client: httpx.AsyncClient, url: str, data: dict[str, str], ) -> dict: response = await client.post( url, data=data, headers={"Content-Type": "application/x-www-form-urlencoded"}, ) response.raise_for_status() return response.json() @retry( retry=retry_if_exception_type((httpx.RequestError, httpx.TimeoutException)), stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=4), ) async def fetch_json_get(client: httpx.AsyncClient, url: str) -> dict: response = await client.get(url) response.raise_for_status() return response.json() async def scrape_dicionario_first(term: str) -> Optional[Definition]: search_url = f"https://dicionariopetroleoegas.com.br/?s={quote_plus(term)}" timeout = httpx.Timeout(10.0, connect=5.0) start_time = time.perf_counter() try: async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client: search_html = await fetch_text(client, search_url) search_parser = HTMLParser(search_html) first_link = search_parser.css_first("div.definitionlist a") if not first_link: logger.info( "no_results", source="dicionariopetroleoegas", term=term, url=search_url, request_id=correlation_id.get(), ) return None detail_url = first_link.attributes.get("href") if not detail_url: logger.warning( "missing_detail_url", source="dicionariopetroleoegas", term=term, url=search_url, request_id=correlation_id.get(), ) return None detail_html = await fetch_text(client, detail_url) detail_parser = HTMLParser(detail_html) article_node = detail_parser.css_first("div.maincontent article") if not article_node: logger.warning( "missing_article", source="dicionariopetroleoegas", term=term, url=detail_url, request_id=correlation_id.get(), ) return None definition_text = " ".join(article_node.text().split()) elapsed_ms = (time.perf_counter() - start_time) * 1000 logger.info( "scrape_success", source="dicionariopetroleoegas", term=term, url=detail_url, elapsed_ms=round(elapsed_ms, 2), request_id=correlation_id.get(), ) title = first_link.attributes.get("title") or term return Definition( source="dicionariopetroleoegas", title=title, url=detail_url, definition=definition_text, ) except httpx.HTTPStatusError as exc: logger.warning( "http_status_error", source="dicionariopetroleoegas", term=term, url=str(exc.request.url), status_code=exc.response.status_code, request_id=correlation_id.get(), ) except httpx.RequestError as exc: logger.warning( "network_error", source="dicionariopetroleoegas", term=term, url=str(exc.request.url) if exc.request else search_url, error=str(exc), request_id=correlation_id.get(), ) return None async def scrape_slb_first(term: str) -> Optional[Definition]: search_url = "https://glossary.slb.com/coveo/rest/search/v2?siteName=OilfieldGlossary" payload = { "q": term, "aq": "(@z95xpath==28F6D9B16B684F7C9BE6937026AB0B6B)", "searchHub": "OilfieldGlossarySearchPage", "locale": "en", "pipeline": "SLBCom", "numberOfResults": "12", } timeout = httpx.Timeout(10.0, connect=5.0) start_time = time.perf_counter() try: async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client: search_json = await fetch_json(client, search_url, payload) results = search_json.get("results", []) if isinstance(search_json, dict) else [] if not results: logger.info( "no_results", source="slb-glossary", term=term, url=search_url, request_id=correlation_id.get(), ) return None first_result = results[0] detail_url = first_result.get("printableUri") or first_result.get("clickUri") if not detail_url: logger.warning( "missing_detail_url", source="slb-glossary", term=term, url=search_url, request_id=correlation_id.get(), ) return None detail_html = await fetch_text(client, detail_url) detail_parser = HTMLParser(detail_html) content_node = detail_parser.css_first("div.content-two-col__text") if not content_node: logger.warning( "missing_article", source="slb-glossary", term=term, url=detail_url, request_id=correlation_id.get(), ) return None definition_text = " ".join(content_node.text().split()) elapsed_ms = (time.perf_counter() - start_time) * 1000 logger.info( "scrape_success", source="slb-glossary", term=term, url=detail_url, elapsed_ms=round(elapsed_ms, 2), request_id=correlation_id.get(), ) raw = first_result.get("raw", {}) if isinstance(first_result, dict) else {} title = raw.get("mainz32xtitle") or term return Definition( source="slb-glossary", title=title, url=detail_url, definition=definition_text, ) except httpx.HTTPStatusError as exc: logger.warning( "http_status_error", source="slb-glossary", term=term, url=str(exc.request.url), status_code=exc.response.status_code, request_id=correlation_id.get(), ) except httpx.RequestError as exc: logger.warning( "network_error", source="slb-glossary", term=term, url=str(exc.request.url) if exc.request else search_url, error=str(exc), request_id=correlation_id.get(), ) return None async def scrape_merriam_first(term: str) -> Optional[Definition]: search_url = ( "https://www.merriam-webster.com/lapi/v1/mwol-search/autocomplete" f"?search={quote_plus(term)}" ) timeout = httpx.Timeout(10.0, connect=5.0) start_time = time.perf_counter() try: async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client: search_json = await fetch_json_get(client, search_url) docs = search_json.get("docs", []) if isinstance(search_json, dict) else [] if not docs: logger.info( "no_results", source="merriam-webster", term=term, url=search_url, request_id=correlation_id.get(), ) return None first_doc = docs[0] if isinstance(docs[0], dict) else {} slug = first_doc.get("slug") title = first_doc.get("word") or term if not slug: logger.warning( "missing_detail_url", source="merriam-webster", term=term, url=search_url, request_id=correlation_id.get(), ) return None detail_url = f"https://www.merriam-webster.com{slug}" detail_html = await fetch_text(client, detail_url) detail_parser = HTMLParser(detail_html) content_node = detail_parser.css_first("span.dtText") if not content_node: logger.warning( "missing_article", source="merriam-webster", term=term, url=detail_url, request_id=correlation_id.get(), ) return None definition_text = " ".join(content_node.text().split()) definition_text = definition_text.lstrip(":").strip() elapsed_ms = (time.perf_counter() - start_time) * 1000 logger.info( "scrape_success", source="merriam-webster", term=term, url=detail_url, elapsed_ms=round(elapsed_ms, 2), request_id=correlation_id.get(), ) return Definition( source="merriam-webster", title=title, url=detail_url, definition=definition_text, ) except httpx.HTTPStatusError as exc: logger.warning( "http_status_error", source="merriam-webster", term=term, url=str(exc.request.url), status_code=exc.response.status_code, request_id=correlation_id.get(), ) except httpx.RequestError as exc: logger.warning( "network_error", source="merriam-webster", term=term, url=str(exc.request.url) if exc.request else search_url, error=str(exc), request_id=correlation_id.get(), ) return None @app.get("/api/health") def health() -> dict: return {"status": "ok"} @app.get("/api/definitions", response_model=DefinitionResponse) async def get_definitions( response: Response, term: str = Query(min_length=1), ) -> DefinitionResponse: request_id = correlation_id.get() if request_id: response.headers["X-Request-ID"] = request_id results = [ result for result in await asyncio.gather( scrape_dicionario_first(term), scrape_slb_first(term), scrape_merriam_first(term), ) if result ] taxonomy = find_taxonomy_matches(term) return DefinitionResponse( term=term, results=results, request_id=request_id, taxonomy=taxonomy, )