Files
ontology-term-search/backend/app/main.py
2026-01-28 15:34:58 -03:00

489 lines
16 KiB
Python

from __future__ import annotations
import asyncio
import json
import logging
import os
import time
from functools import lru_cache
from pathlib import Path
from typing import List, Optional
from urllib.parse import quote_plus
import httpx
import structlog
from asgi_correlation_id import CorrelationIdMiddleware
from asgi_correlation_id.context import correlation_id
from fastapi import FastAPI, Query, Response
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from selectolax.parser import HTMLParser
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
class Definition(BaseModel):
source: str
title: str
url: str
definition: str
class TaxonomyMatch(BaseModel):
category: str
class_name: str
class_code: str
type_description: Optional[str] = None
type_code: Optional[str] = None
annex: Optional[str] = None
full_name: str
class DefinitionResponse(BaseModel):
term: str
results: List[Definition]
request_id: Optional[str] = None
taxonomy: List[TaxonomyMatch] = Field(default_factory=list)
app = FastAPI(title="TermSearch API", version="0.1.0")
logging.basicConfig(format="%(message)s", level=logging.INFO)
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.add_log_level,
structlog.processors.EventRenamer("event"),
structlog.processors.JSONRenderer(),
]
)
logger = structlog.get_logger("termsearch")
app.add_middleware(
CorrelationIdMiddleware,
header_name="X-Request-ID",
)
frontend_origin = os.getenv("FRONTEND_ORIGIN", "http://localhost:5173")
allowed_origins = [origin.strip() for origin in frontend_origin.split(",") if origin.strip()]
app.add_middleware(
CORSMiddleware,
allow_origins=allowed_origins,
allow_credentials=True,
allow_methods=["*"] ,
allow_headers=["*"],
)
def normalize_text(text: str) -> str:
return " ".join(text.lower().split())
@lru_cache(maxsize=1)
def load_taxonomy() -> dict:
root_dir = Path(__file__).resolve().parents[1]
tax_path = root_dir / "data" / "iso" / "iso-14244-tax.json"
if not tax_path.exists():
tax_path = Path("/data/iso/iso-14244-tax.json")
with tax_path.open("r", encoding="utf-8") as handle:
return json.load(handle)
def find_taxonomy_matches(term: str) -> List[TaxonomyMatch]:
normalized_term = normalize_text(term)
term_tokens = normalized_term.split()
if not term_tokens:
return []
data = load_taxonomy()
taxonomy = data.get("taxonomy", {}) if isinstance(data, dict) else {}
categories = taxonomy.get("categories", {}) if isinstance(taxonomy, dict) else {}
matches: List[TaxonomyMatch] = []
for category_name, category in categories.items():
classes = category.get("classes", {}) if isinstance(category, dict) else {}
for class_code, class_info in classes.items():
class_name = class_info.get("name", "")
annex = class_info.get("annex")
types = class_info.get("types", {}) if isinstance(class_info, dict) else {}
for type_code, type_info in types.items():
type_description = type_info.get("description", "")
full_name = f"{type_description} {class_name}".strip()
full_name_normalized = normalize_text(full_name)
if all(token in full_name_normalized for token in term_tokens):
matches.append(
TaxonomyMatch(
category=category_name,
class_name=class_name,
class_code=class_code,
type_description=type_description,
type_code=type_code,
annex=annex,
full_name=full_name,
)
)
class_name_normalized = normalize_text(class_name)
if class_name and all(token in class_name_normalized for token in term_tokens):
matches.append(
TaxonomyMatch(
category=category_name,
class_name=class_name,
class_code=class_code,
annex=annex,
full_name=class_name,
)
)
return matches
@retry(
retry=retry_if_exception_type((httpx.RequestError, httpx.TimeoutException)),
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=0.5, min=0.5, max=4),
)
async def fetch_text(client: httpx.AsyncClient, url: str) -> str:
response = await client.get(url)
response.raise_for_status()
return response.text
@retry(
retry=retry_if_exception_type((httpx.RequestError, httpx.TimeoutException)),
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=0.5, min=0.5, max=4),
)
async def fetch_json(
client: httpx.AsyncClient,
url: str,
data: dict[str, str],
) -> dict:
response = await client.post(
url,
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
response.raise_for_status()
return response.json()
@retry(
retry=retry_if_exception_type((httpx.RequestError, httpx.TimeoutException)),
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=0.5, min=0.5, max=4),
)
async def fetch_json_get(client: httpx.AsyncClient, url: str) -> dict:
response = await client.get(url)
response.raise_for_status()
return response.json()
async def scrape_dicionario_first(term: str) -> Optional[Definition]:
search_url = f"https://dicionariopetroleoegas.com.br/?s={quote_plus(term)}"
timeout = httpx.Timeout(10.0, connect=5.0)
start_time = time.perf_counter()
try:
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
search_html = await fetch_text(client, search_url)
search_parser = HTMLParser(search_html)
first_link = search_parser.css_first("div.definitionlist a")
if not first_link:
logger.info(
"no_results",
source="dicionariopetroleoegas",
term=term,
url=search_url,
request_id=correlation_id.get(),
)
return None
detail_url = first_link.attributes.get("href")
if not detail_url:
logger.warning(
"missing_detail_url",
source="dicionariopetroleoegas",
term=term,
url=search_url,
request_id=correlation_id.get(),
)
return None
detail_html = await fetch_text(client, detail_url)
detail_parser = HTMLParser(detail_html)
article_node = detail_parser.css_first("div.maincontent article")
if not article_node:
logger.warning(
"missing_article",
source="dicionariopetroleoegas",
term=term,
url=detail_url,
request_id=correlation_id.get(),
)
return None
definition_text = " ".join(article_node.text().split())
elapsed_ms = (time.perf_counter() - start_time) * 1000
logger.info(
"scrape_success",
source="dicionariopetroleoegas",
term=term,
url=detail_url,
elapsed_ms=round(elapsed_ms, 2),
request_id=correlation_id.get(),
)
title = first_link.attributes.get("title") or term
return Definition(
source="dicionariopetroleoegas",
title=title,
url=detail_url,
definition=definition_text,
)
except httpx.HTTPStatusError as exc:
logger.warning(
"http_status_error",
source="dicionariopetroleoegas",
term=term,
url=str(exc.request.url),
status_code=exc.response.status_code,
request_id=correlation_id.get(),
)
except httpx.RequestError as exc:
logger.warning(
"network_error",
source="dicionariopetroleoegas",
term=term,
url=str(exc.request.url) if exc.request else search_url,
error=str(exc),
request_id=correlation_id.get(),
)
return None
async def scrape_slb_first(term: str) -> Optional[Definition]:
search_url = "https://glossary.slb.com/coveo/rest/search/v2?siteName=OilfieldGlossary"
payload = {
"q": term,
"aq": "(@z95xpath==28F6D9B16B684F7C9BE6937026AB0B6B)",
"searchHub": "OilfieldGlossarySearchPage",
"locale": "en",
"pipeline": "SLBCom",
"numberOfResults": "12",
}
timeout = httpx.Timeout(10.0, connect=5.0)
start_time = time.perf_counter()
try:
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
search_json = await fetch_json(client, search_url, payload)
results = search_json.get("results", []) if isinstance(search_json, dict) else []
if not results:
logger.info(
"no_results",
source="slb-glossary",
term=term,
url=search_url,
request_id=correlation_id.get(),
)
return None
first_result = results[0]
detail_url = first_result.get("printableUri") or first_result.get("clickUri")
if not detail_url:
logger.warning(
"missing_detail_url",
source="slb-glossary",
term=term,
url=search_url,
request_id=correlation_id.get(),
)
return None
detail_html = await fetch_text(client, detail_url)
detail_parser = HTMLParser(detail_html)
content_node = detail_parser.css_first("div.content-two-col__text")
if not content_node:
logger.warning(
"missing_article",
source="slb-glossary",
term=term,
url=detail_url,
request_id=correlation_id.get(),
)
return None
definition_text = " ".join(content_node.text().split())
elapsed_ms = (time.perf_counter() - start_time) * 1000
logger.info(
"scrape_success",
source="slb-glossary",
term=term,
url=detail_url,
elapsed_ms=round(elapsed_ms, 2),
request_id=correlation_id.get(),
)
raw = first_result.get("raw", {}) if isinstance(first_result, dict) else {}
title = raw.get("mainz32xtitle") or term
return Definition(
source="slb-glossary",
title=title,
url=detail_url,
definition=definition_text,
)
except httpx.HTTPStatusError as exc:
logger.warning(
"http_status_error",
source="slb-glossary",
term=term,
url=str(exc.request.url),
status_code=exc.response.status_code,
request_id=correlation_id.get(),
)
except httpx.RequestError as exc:
logger.warning(
"network_error",
source="slb-glossary",
term=term,
url=str(exc.request.url) if exc.request else search_url,
error=str(exc),
request_id=correlation_id.get(),
)
return None
async def scrape_merriam_first(term: str) -> Optional[Definition]:
search_url = (
"https://www.merriam-webster.com/lapi/v1/mwol-search/autocomplete"
f"?search={quote_plus(term)}"
)
timeout = httpx.Timeout(10.0, connect=5.0)
start_time = time.perf_counter()
try:
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
search_json = await fetch_json_get(client, search_url)
docs = search_json.get("docs", []) if isinstance(search_json, dict) else []
if not docs:
logger.info(
"no_results",
source="merriam-webster",
term=term,
url=search_url,
request_id=correlation_id.get(),
)
return None
first_doc = docs[0] if isinstance(docs[0], dict) else {}
slug = first_doc.get("slug")
title = first_doc.get("word") or term
if not slug:
logger.warning(
"missing_detail_url",
source="merriam-webster",
term=term,
url=search_url,
request_id=correlation_id.get(),
)
return None
detail_url = f"https://www.merriam-webster.com{slug}"
detail_html = await fetch_text(client, detail_url)
detail_parser = HTMLParser(detail_html)
content_node = detail_parser.css_first("span.dtText")
if not content_node:
logger.warning(
"missing_article",
source="merriam-webster",
term=term,
url=detail_url,
request_id=correlation_id.get(),
)
return None
definition_text = " ".join(content_node.text().split())
definition_text = definition_text.lstrip(":").strip()
elapsed_ms = (time.perf_counter() - start_time) * 1000
logger.info(
"scrape_success",
source="merriam-webster",
term=term,
url=detail_url,
elapsed_ms=round(elapsed_ms, 2),
request_id=correlation_id.get(),
)
return Definition(
source="merriam-webster",
title=title,
url=detail_url,
definition=definition_text,
)
except httpx.HTTPStatusError as exc:
logger.warning(
"http_status_error",
source="merriam-webster",
term=term,
url=str(exc.request.url),
status_code=exc.response.status_code,
request_id=correlation_id.get(),
)
except httpx.RequestError as exc:
logger.warning(
"network_error",
source="merriam-webster",
term=term,
url=str(exc.request.url) if exc.request else search_url,
error=str(exc),
request_id=correlation_id.get(),
)
return None
@app.get("/api/health")
def health() -> dict:
return {"status": "ok"}
@app.get("/api/definitions", response_model=DefinitionResponse)
async def get_definitions(
response: Response,
term: str = Query(min_length=1),
) -> DefinitionResponse:
request_id = correlation_id.get()
if request_id:
response.headers["X-Request-ID"] = request_id
results = [
result
for result in await asyncio.gather(
scrape_dicionario_first(term),
scrape_slb_first(term),
scrape_merriam_first(term),
)
if result
]
taxonomy = find_taxonomy_matches(term)
return DefinitionResponse(
term=term,
results=results,
request_id=request_id,
taxonomy=taxonomy,
)