326 lines
12 KiB
Python
326 lines
12 KiB
Python
import os
|
|
import asyncio
|
|
import aiohttp
|
|
import time
|
|
from bs4 import BeautifulSoup
|
|
from markdownify import markdownify as md
|
|
import re
|
|
import json
|
|
from urllib.parse import urljoin, urlparse
|
|
import glob
|
|
|
|
# Configurações
|
|
BASE_URL = "https://tdn.totvs.com"
|
|
API_URL = f"{BASE_URL}/rest/api/content"
|
|
ROOT_PAGE_ID = "653566687" # Documentação Técnica
|
|
OUTPUT_DIR = "fluig_rag_docs"
|
|
IMAGES_DIR = os.path.join(OUTPUT_DIR, "images")
|
|
CONCURRENCY_LIMIT = 10
|
|
DELAY = 0.1
|
|
|
|
HEADERS = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
|
|
}
|
|
|
|
# Arquivos de controle
|
|
PROGRESS_FILE = "extraction_progress.json"
|
|
URL_MAP_FILE = "url_to_path_map.json"
|
|
|
|
semaphore = asyncio.Semaphore(CONCURRENCY_LIMIT)
|
|
|
|
async def fetch_json(session, url, params=None):
|
|
async with semaphore:
|
|
try:
|
|
async with session.get(url, params=params, headers=HEADERS, timeout=30) as response:
|
|
if response.status == 404:
|
|
return None
|
|
response.raise_for_status()
|
|
return await response.json()
|
|
except Exception as e:
|
|
print(f"Erro ao buscar {url}: {e}")
|
|
return None
|
|
|
|
async def get_page_children(session, page_id):
|
|
url = f"{API_URL}/{page_id}/child/page"
|
|
data = await fetch_json(session, url)
|
|
return data.get('results', []) if data else []
|
|
|
|
async def get_page_content(session, page_id):
|
|
url = f"{API_URL}/{page_id}?expand=body.export_view,history.lastUpdated"
|
|
data = await fetch_json(session, url)
|
|
if not data:
|
|
return None, None, None, None, None
|
|
|
|
title = data.get('title')
|
|
html = data.get('body', {}).get('export_view', {}).get('value', '')
|
|
last_updated = data.get('history', {}).get('lastUpdated', {}).get('when', '')
|
|
|
|
links = data.get('_links', {})
|
|
web_ui = links.get('webui', "")
|
|
tiny_ui = links.get('tinyui', "")
|
|
|
|
return title, html, web_ui, tiny_ui, last_updated
|
|
|
|
def clean_filename(filename):
|
|
return re.sub(r'[\\/*?:"<>|]', "", filename)
|
|
|
|
async def download_image(session, img_url):
|
|
"""Baixa uma imagem e retorna o caminho local relativo."""
|
|
if not img_url.startswith("http"):
|
|
img_url = urljoin(BASE_URL, img_url)
|
|
|
|
parsed_url = urlparse(img_url)
|
|
img_name = clean_filename(os.path.basename(parsed_url.path))
|
|
if not img_name:
|
|
img_name = f"img_{hash(img_url)}.png"
|
|
|
|
local_path = os.path.join(IMAGES_DIR, img_name)
|
|
|
|
if os.path.exists(local_path):
|
|
return os.path.join("images", img_name)
|
|
|
|
async with semaphore:
|
|
try:
|
|
async with session.get(img_url, headers=HEADERS, timeout=20) as response:
|
|
if response.status != 200: return img_url
|
|
img_data = await response.read()
|
|
with open(local_path, "wb") as f:
|
|
f.write(img_data)
|
|
return os.path.join("images", img_name)
|
|
except Exception as e:
|
|
print(f"Erro ao baixar imagem {img_url}: {e}")
|
|
return img_url
|
|
|
|
def treat_macros(soup):
|
|
"""Trata macros específicas do Confluence para melhorar o Markdown."""
|
|
# Expand macro -> HTML details
|
|
for expand_div in soup.find_all("div", class_="expand-container"):
|
|
title_div = expand_div.find("div", class_="expand-control")
|
|
content_div = expand_div.find("div", class_="expand-content")
|
|
if title_div and content_div:
|
|
title_text = title_div.get_text(strip=True) or "Expandir"
|
|
new_tag = soup.new_tag("details")
|
|
summary = soup.new_tag("summary")
|
|
summary.string = title_text
|
|
new_tag.append(summary)
|
|
# Preservar conteúdo interno
|
|
new_tag.append(content_div)
|
|
expand_div.replace_with(new_tag)
|
|
|
|
# Info/Warning macros
|
|
macro_mapping = {
|
|
"confluence-information-macro-information": "info",
|
|
"confluence-information-macro-note": "note",
|
|
"confluence-information-macro-warning": "warning",
|
|
"confluence-information-macro-tip": "tip"
|
|
}
|
|
for macro in soup.find_all("div", class_="confluence-information-macro"):
|
|
m_type = "info"
|
|
classes = macro.get("class", [])
|
|
for cls, target in macro_mapping.items():
|
|
if cls in classes:
|
|
m_type = target
|
|
break
|
|
|
|
# Injetar uma marcação que podemos converter depois ou deixar como blockquote
|
|
# O markdownify converterá <blockquote> para >
|
|
# Vamos tentar algo que o MkDocs Admonition reconheça se possível,
|
|
# ou apenas deixar mais legível.
|
|
title_span = macro.find("span", class_="confluence-information-macro-title")
|
|
title = f"**{title_span.get_text(strip=True)}**\n\n" if title_span else ""
|
|
|
|
blockquote = soup.new_tag("blockquote")
|
|
content_body = macro.find("div", class_="confluence-information-macro-body")
|
|
if content_body:
|
|
# Prefixar com o tipo para facilitar pós-processamento de Admonition
|
|
content_body.insert(0, BeautifulSoup(f"<p>!!! {m_type}</p>", "html.parser"))
|
|
blockquote.append(content_body)
|
|
macro.replace_with(blockquote)
|
|
|
|
return soup
|
|
|
|
def sanitize_code_blocks(markdown_content):
|
|
"""Tenta inferir a linguagem de blocos de código sem linguagem definida."""
|
|
def replace_code(match):
|
|
lang = match.group(1).strip()
|
|
code = match.group(2)
|
|
|
|
if not lang or lang == "java": # Confluence às vezes erra java vs js
|
|
if any(x in code for x in ["DatasetBuilder", "createDataset", "displayFields", "getSelectedZoomItem"]):
|
|
lang = "javascript"
|
|
elif any(x in code for x in ["PreparedStatement", "ResultSet", "DriverManager"]):
|
|
lang = "java"
|
|
elif "SELECT" in code.upper() and "FROM" in code.upper():
|
|
lang = "sql"
|
|
|
|
return f"```{lang}\n{code}\n```"
|
|
|
|
pattern = re.compile(r"```(.*?)\n(.*?)\n```", re.DOTALL)
|
|
return pattern.sub(replace_code, markdown_content)
|
|
|
|
async def process_links_and_images(session, html, current_file_dir, url_map):
|
|
"""Processa HTML para baixar imagens e preparar links locais."""
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
soup = treat_macros(soup)
|
|
|
|
tasks = []
|
|
imgs_to_process = []
|
|
for img in soup.find_all("img"):
|
|
src = img.get("src")
|
|
if src:
|
|
tasks.append(download_image(session, src))
|
|
imgs_to_process.append(img)
|
|
|
|
if tasks:
|
|
local_paths = await asyncio.gather(*tasks)
|
|
for img, local_path in zip(imgs_to_process, local_paths):
|
|
if local_path.startswith("http"): continue
|
|
rel_path = os.path.relpath(os.path.join(OUTPUT_DIR, local_path), current_file_dir)
|
|
img["src"] = rel_path
|
|
|
|
return str(soup)
|
|
|
|
def save_markdown(path, title, html_content, source_url):
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
|
|
# Converter HTML para Markdown
|
|
markdown_content = md(html_content, heading_style="ATX", bullets="-")
|
|
|
|
# Converter as marcações de Admonition que injetamos
|
|
markdown_content = markdown_content.replace("> !!!", "!!!")
|
|
|
|
# Sanitização de blocos de código
|
|
markdown_content = sanitize_code_blocks(markdown_content)
|
|
|
|
header = f"---\ntitle: {title}\nsource: {source_url}\npath: {path.replace(OUTPUT_DIR, '')}\n---\n\n"
|
|
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
f.write(header + markdown_content)
|
|
|
|
async def build_tree_map(session, page_id, current_path, url_map):
|
|
"""Primeiro passo: mapear todas as URLs para caminhos locais."""
|
|
title, _, web_ui, tiny_ui, _ = await get_page_content(session, page_id)
|
|
if not title: return
|
|
|
|
safe_title = clean_filename(title)
|
|
file_path = os.path.join(current_path, f"{safe_title}.md")
|
|
|
|
urls_to_map = [web_ui, urljoin(BASE_URL, web_ui), tiny_ui, urljoin(BASE_URL, tiny_ui)]
|
|
if web_ui:
|
|
public_ui = web_ui.replace("/display/fluig/", "/display/public/fluig/")
|
|
urls_to_map.extend([public_ui, urljoin(BASE_URL, public_ui)])
|
|
|
|
id_ui = f"/pages/viewpage.action?pageId={page_id}"
|
|
urls_to_map.extend([id_ui, urljoin(BASE_URL, id_ui)])
|
|
|
|
for u in filter(None, urls_to_map):
|
|
url_map[u] = file_path
|
|
|
|
print(f"Mapeando: {title}")
|
|
|
|
children = await get_page_children(session, page_id)
|
|
new_path = os.path.join(current_path, safe_title)
|
|
|
|
tasks = [build_tree_map(session, child['id'], new_path, url_map) for child in children]
|
|
await asyncio.gather(*tasks)
|
|
|
|
async def extract_content(session, page_id, current_path, progress_data, url_map):
|
|
"""Segundo passo: baixar conteúdo e imagens."""
|
|
title, html, web_ui, tiny_ui, last_updated = await get_page_content(session, page_id)
|
|
if not title: return
|
|
|
|
safe_title = clean_filename(title)
|
|
file_dir = current_path
|
|
file_path = os.path.join(file_dir, f"{safe_title}.md")
|
|
|
|
# Extração Incremental
|
|
if page_id in progress_data and progress_data[page_id] == last_updated and os.path.exists(file_path):
|
|
print(f"Pulando (Inalterado): {title}")
|
|
else:
|
|
print(f"Extraindo: {title}")
|
|
processed_html = await process_links_and_images(session, html, file_dir, url_map)
|
|
source_url = urljoin(BASE_URL, web_ui) if web_ui else ""
|
|
save_markdown(file_path, title, processed_html, source_url)
|
|
|
|
progress_data[page_id] = last_updated
|
|
with open(PROGRESS_FILE, "w") as f:
|
|
json.dump(progress_data, f)
|
|
|
|
children = await get_page_children(session, page_id)
|
|
new_path = os.path.join(current_path, safe_title)
|
|
|
|
tasks = [extract_content(session, child['id'], new_path, progress_data, url_map) for child in children]
|
|
await asyncio.gather(*tasks)
|
|
|
|
def post_process_links(url_map):
|
|
print("\nIniciando pós-processamento de links locais...")
|
|
sorted_urls = sorted(url_map.keys(), key=len, reverse=True)
|
|
all_files = glob.glob(os.path.join(OUTPUT_DIR, "**", "*.md"), recursive=True)
|
|
|
|
# Regex para links Markdown que suporta parênteses aninhados
|
|
link_pattern = re.compile(r"\[(.*?)\]\(((?:[^()]+|\([^()]*\))*)\)")
|
|
|
|
for local_file_path in all_files:
|
|
if not os.path.isfile(local_file_path): continue
|
|
with open(local_file_path, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
|
|
original_content = content
|
|
current_dir = os.path.dirname(local_file_path)
|
|
|
|
def replace_url_in_link(match):
|
|
text = match.group(1)
|
|
url = match.group(2)
|
|
|
|
for target_url in sorted_urls:
|
|
if target_url in url:
|
|
target_local_path = url_map[target_url]
|
|
if os.path.abspath(target_local_path) == os.path.abspath(local_file_path): continue
|
|
|
|
rel_link = os.path.relpath(target_local_path, current_dir).replace("\\", "/")
|
|
|
|
# Se o link for uma sub-página mas o pai foi substituído por .md
|
|
# Corrigimos para manter a estrutura de diretório
|
|
if url.startswith(target_url + "/"):
|
|
parent_dir = rel_link.replace(".md", "")
|
|
new_url = url.replace(target_url, parent_dir)
|
|
return f"[{text}]({new_url})"
|
|
|
|
return f"[{text}]({rel_link})"
|
|
return match.group(0)
|
|
|
|
content = link_pattern.sub(replace_url_in_link, content)
|
|
|
|
if content != original_content:
|
|
with open(local_file_path, "w", encoding="utf-8") as f:
|
|
f.write(content)
|
|
|
|
async def main():
|
|
if not os.path.exists(IMAGES_DIR):
|
|
os.makedirs(IMAGES_DIR)
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
url_map = {}
|
|
if os.path.exists(URL_MAP_FILE):
|
|
with open(URL_MAP_FILE, "r") as f:
|
|
url_map = json.load(f)
|
|
else:
|
|
print("Fase 1: Mapeando árvore de URLs...")
|
|
await build_tree_map(session, ROOT_PAGE_ID, OUTPUT_DIR, url_map)
|
|
with open(URL_MAP_FILE, "w") as f:
|
|
json.dump(url_map, f)
|
|
|
|
progress_data = {}
|
|
if os.path.exists(PROGRESS_FILE):
|
|
with open(PROGRESS_FILE, "r") as f:
|
|
progress_data = json.load(f)
|
|
|
|
print("\nFase 2: Extraindo conteúdo e imagens...")
|
|
await extract_content(session, ROOT_PAGE_ID, OUTPUT_DIR, progress_data, url_map)
|
|
|
|
post_process_links(url_map)
|
|
print("\nWiki local 100% Offline concluída!")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|