Files
apitdn/fluig_extractor.py
T

326 lines
12 KiB
Python

import os
import asyncio
import aiohttp
import time
from bs4 import BeautifulSoup
from markdownify import markdownify as md
import re
import json
from urllib.parse import urljoin, urlparse
import glob
# Configurações
BASE_URL = "https://tdn.totvs.com"
API_URL = f"{BASE_URL}/rest/api/content"
ROOT_PAGE_ID = "653566687" # Documentação Técnica
OUTPUT_DIR = "fluig_rag_docs"
IMAGES_DIR = os.path.join(OUTPUT_DIR, "images")
CONCURRENCY_LIMIT = 10
DELAY = 0.1
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
# Arquivos de controle
PROGRESS_FILE = "extraction_progress.json"
URL_MAP_FILE = "url_to_path_map.json"
semaphore = asyncio.Semaphore(CONCURRENCY_LIMIT)
async def fetch_json(session, url, params=None):
async with semaphore:
try:
async with session.get(url, params=params, headers=HEADERS, timeout=30) as response:
if response.status == 404:
return None
response.raise_for_status()
return await response.json()
except Exception as e:
print(f"Erro ao buscar {url}: {e}")
return None
async def get_page_children(session, page_id):
url = f"{API_URL}/{page_id}/child/page"
data = await fetch_json(session, url)
return data.get('results', []) if data else []
async def get_page_content(session, page_id):
url = f"{API_URL}/{page_id}?expand=body.export_view,history.lastUpdated"
data = await fetch_json(session, url)
if not data:
return None, None, None, None, None
title = data.get('title')
html = data.get('body', {}).get('export_view', {}).get('value', '')
last_updated = data.get('history', {}).get('lastUpdated', {}).get('when', '')
links = data.get('_links', {})
web_ui = links.get('webui', "")
tiny_ui = links.get('tinyui', "")
return title, html, web_ui, tiny_ui, last_updated
def clean_filename(filename):
return re.sub(r'[\\/*?:"<>|]', "", filename)
async def download_image(session, img_url):
"""Baixa uma imagem e retorna o caminho local relativo."""
if not img_url.startswith("http"):
img_url = urljoin(BASE_URL, img_url)
parsed_url = urlparse(img_url)
img_name = clean_filename(os.path.basename(parsed_url.path))
if not img_name:
img_name = f"img_{hash(img_url)}.png"
local_path = os.path.join(IMAGES_DIR, img_name)
if os.path.exists(local_path):
return os.path.join("images", img_name)
async with semaphore:
try:
async with session.get(img_url, headers=HEADERS, timeout=20) as response:
if response.status != 200: return img_url
img_data = await response.read()
with open(local_path, "wb") as f:
f.write(img_data)
return os.path.join("images", img_name)
except Exception as e:
print(f"Erro ao baixar imagem {img_url}: {e}")
return img_url
def treat_macros(soup):
"""Trata macros específicas do Confluence para melhorar o Markdown."""
# Expand macro -> HTML details
for expand_div in soup.find_all("div", class_="expand-container"):
title_div = expand_div.find("div", class_="expand-control")
content_div = expand_div.find("div", class_="expand-content")
if title_div and content_div:
title_text = title_div.get_text(strip=True) or "Expandir"
new_tag = soup.new_tag("details")
summary = soup.new_tag("summary")
summary.string = title_text
new_tag.append(summary)
# Preservar conteúdo interno
new_tag.append(content_div)
expand_div.replace_with(new_tag)
# Info/Warning macros
macro_mapping = {
"confluence-information-macro-information": "info",
"confluence-information-macro-note": "note",
"confluence-information-macro-warning": "warning",
"confluence-information-macro-tip": "tip"
}
for macro in soup.find_all("div", class_="confluence-information-macro"):
m_type = "info"
classes = macro.get("class", [])
for cls, target in macro_mapping.items():
if cls in classes:
m_type = target
break
# Injetar uma marcação que podemos converter depois ou deixar como blockquote
# O markdownify converterá <blockquote> para >
# Vamos tentar algo que o MkDocs Admonition reconheça se possível,
# ou apenas deixar mais legível.
title_span = macro.find("span", class_="confluence-information-macro-title")
title = f"**{title_span.get_text(strip=True)}**\n\n" if title_span else ""
blockquote = soup.new_tag("blockquote")
content_body = macro.find("div", class_="confluence-information-macro-body")
if content_body:
# Prefixar com o tipo para facilitar pós-processamento de Admonition
content_body.insert(0, BeautifulSoup(f"<p>!!! {m_type}</p>", "html.parser"))
blockquote.append(content_body)
macro.replace_with(blockquote)
return soup
def sanitize_code_blocks(markdown_content):
"""Tenta inferir a linguagem de blocos de código sem linguagem definida."""
def replace_code(match):
lang = match.group(1).strip()
code = match.group(2)
if not lang or lang == "java": # Confluence às vezes erra java vs js
if any(x in code for x in ["DatasetBuilder", "createDataset", "displayFields", "getSelectedZoomItem"]):
lang = "javascript"
elif any(x in code for x in ["PreparedStatement", "ResultSet", "DriverManager"]):
lang = "java"
elif "SELECT" in code.upper() and "FROM" in code.upper():
lang = "sql"
return f"```{lang}\n{code}\n```"
pattern = re.compile(r"```(.*?)\n(.*?)\n```", re.DOTALL)
return pattern.sub(replace_code, markdown_content)
async def process_links_and_images(session, html, current_file_dir, url_map):
"""Processa HTML para baixar imagens e preparar links locais."""
soup = BeautifulSoup(html, "html.parser")
soup = treat_macros(soup)
tasks = []
imgs_to_process = []
for img in soup.find_all("img"):
src = img.get("src")
if src:
tasks.append(download_image(session, src))
imgs_to_process.append(img)
if tasks:
local_paths = await asyncio.gather(*tasks)
for img, local_path in zip(imgs_to_process, local_paths):
if local_path.startswith("http"): continue
rel_path = os.path.relpath(os.path.join(OUTPUT_DIR, local_path), current_file_dir)
img["src"] = rel_path
return str(soup)
def save_markdown(path, title, html_content, source_url):
os.makedirs(os.path.dirname(path), exist_ok=True)
# Converter HTML para Markdown
markdown_content = md(html_content, heading_style="ATX", bullets="-")
# Converter as marcações de Admonition que injetamos
markdown_content = markdown_content.replace("> !!!", "!!!")
# Sanitização de blocos de código
markdown_content = sanitize_code_blocks(markdown_content)
header = f"---\ntitle: {title}\nsource: {source_url}\npath: {path.replace(OUTPUT_DIR, '')}\n---\n\n"
with open(path, "w", encoding="utf-8") as f:
f.write(header + markdown_content)
async def build_tree_map(session, page_id, current_path, url_map):
"""Primeiro passo: mapear todas as URLs para caminhos locais."""
title, _, web_ui, tiny_ui, _ = await get_page_content(session, page_id)
if not title: return
safe_title = clean_filename(title)
file_path = os.path.join(current_path, f"{safe_title}.md")
urls_to_map = [web_ui, urljoin(BASE_URL, web_ui), tiny_ui, urljoin(BASE_URL, tiny_ui)]
if web_ui:
public_ui = web_ui.replace("/display/fluig/", "/display/public/fluig/")
urls_to_map.extend([public_ui, urljoin(BASE_URL, public_ui)])
id_ui = f"/pages/viewpage.action?pageId={page_id}"
urls_to_map.extend([id_ui, urljoin(BASE_URL, id_ui)])
for u in filter(None, urls_to_map):
url_map[u] = file_path
print(f"Mapeando: {title}")
children = await get_page_children(session, page_id)
new_path = os.path.join(current_path, safe_title)
tasks = [build_tree_map(session, child['id'], new_path, url_map) for child in children]
await asyncio.gather(*tasks)
async def extract_content(session, page_id, current_path, progress_data, url_map):
"""Segundo passo: baixar conteúdo e imagens."""
title, html, web_ui, tiny_ui, last_updated = await get_page_content(session, page_id)
if not title: return
safe_title = clean_filename(title)
file_dir = current_path
file_path = os.path.join(file_dir, f"{safe_title}.md")
# Extração Incremental
if page_id in progress_data and progress_data[page_id] == last_updated and os.path.exists(file_path):
print(f"Pulando (Inalterado): {title}")
else:
print(f"Extraindo: {title}")
processed_html = await process_links_and_images(session, html, file_dir, url_map)
source_url = urljoin(BASE_URL, web_ui) if web_ui else ""
save_markdown(file_path, title, processed_html, source_url)
progress_data[page_id] = last_updated
with open(PROGRESS_FILE, "w") as f:
json.dump(progress_data, f)
children = await get_page_children(session, page_id)
new_path = os.path.join(current_path, safe_title)
tasks = [extract_content(session, child['id'], new_path, progress_data, url_map) for child in children]
await asyncio.gather(*tasks)
def post_process_links(url_map):
print("\nIniciando pós-processamento de links locais...")
sorted_urls = sorted(url_map.keys(), key=len, reverse=True)
all_files = glob.glob(os.path.join(OUTPUT_DIR, "**", "*.md"), recursive=True)
# Regex para links Markdown que suporta parênteses aninhados
link_pattern = re.compile(r"\[(.*?)\]\(((?:[^()]+|\([^()]*\))*)\)")
for local_file_path in all_files:
if not os.path.isfile(local_file_path): continue
with open(local_file_path, "r", encoding="utf-8") as f:
content = f.read()
original_content = content
current_dir = os.path.dirname(local_file_path)
def replace_url_in_link(match):
text = match.group(1)
url = match.group(2)
for target_url in sorted_urls:
if target_url in url:
target_local_path = url_map[target_url]
if os.path.abspath(target_local_path) == os.path.abspath(local_file_path): continue
rel_link = os.path.relpath(target_local_path, current_dir).replace("\\", "/")
# Se o link for uma sub-página mas o pai foi substituído por .md
# Corrigimos para manter a estrutura de diretório
if url.startswith(target_url + "/"):
parent_dir = rel_link.replace(".md", "")
new_url = url.replace(target_url, parent_dir)
return f"[{text}]({new_url})"
return f"[{text}]({rel_link})"
return match.group(0)
content = link_pattern.sub(replace_url_in_link, content)
if content != original_content:
with open(local_file_path, "w", encoding="utf-8") as f:
f.write(content)
async def main():
if not os.path.exists(IMAGES_DIR):
os.makedirs(IMAGES_DIR)
async with aiohttp.ClientSession() as session:
url_map = {}
if os.path.exists(URL_MAP_FILE):
with open(URL_MAP_FILE, "r") as f:
url_map = json.load(f)
else:
print("Fase 1: Mapeando árvore de URLs...")
await build_tree_map(session, ROOT_PAGE_ID, OUTPUT_DIR, url_map)
with open(URL_MAP_FILE, "w") as f:
json.dump(url_map, f)
progress_data = {}
if os.path.exists(PROGRESS_FILE):
with open(PROGRESS_FILE, "r") as f:
progress_data = json.load(f)
print("\nFase 2: Extraindo conteúdo e imagens...")
await extract_content(session, ROOT_PAGE_ID, OUTPUT_DIR, progress_data, url_map)
post_process_links(url_map)
print("\nWiki local 100% Offline concluída!")
if __name__ == "__main__":
asyncio.run(main())