import os import asyncio import aiohttp import time from bs4 import BeautifulSoup from markdownify import markdownify as md import re import json from urllib.parse import urljoin, urlparse import glob # Configurações BASE_URL = "https://tdn.totvs.com" API_URL = f"{BASE_URL}/rest/api/content" ROOT_PAGE_ID = "653566687" # Documentação Técnica OUTPUT_DIR = "fluig_rag_docs" IMAGES_DIR = os.path.join(OUTPUT_DIR, "images") CONCURRENCY_LIMIT = 10 DELAY = 0.1 HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } # Arquivos de controle PROGRESS_FILE = "extraction_progress.json" URL_MAP_FILE = "url_to_path_map.json" semaphore = asyncio.Semaphore(CONCURRENCY_LIMIT) async def fetch_json(session, url, params=None): async with semaphore: try: async with session.get(url, params=params, headers=HEADERS, timeout=30) as response: if response.status == 404: return None response.raise_for_status() return await response.json() except Exception as e: print(f"Erro ao buscar {url}: {e}") return None async def get_page_children(session, page_id): url = f"{API_URL}/{page_id}/child/page" data = await fetch_json(session, url) return data.get('results', []) if data else [] async def get_page_content(session, page_id): url = f"{API_URL}/{page_id}?expand=body.export_view,history.lastUpdated" data = await fetch_json(session, url) if not data: return None, None, None, None, None title = data.get('title') html = data.get('body', {}).get('export_view', {}).get('value', '') last_updated = data.get('history', {}).get('lastUpdated', {}).get('when', '') links = data.get('_links', {}) web_ui = links.get('webui', "") tiny_ui = links.get('tinyui', "") return title, html, web_ui, tiny_ui, last_updated def clean_filename(filename): return re.sub(r'[\\/*?:"<>|]', "", filename) async def download_image(session, img_url): """Baixa uma imagem e retorna o caminho local relativo.""" if not img_url.startswith("http"): img_url = urljoin(BASE_URL, img_url) parsed_url = urlparse(img_url) img_name = clean_filename(os.path.basename(parsed_url.path)) if not img_name: img_name = f"img_{hash(img_url)}.png" local_path = os.path.join(IMAGES_DIR, img_name) if os.path.exists(local_path): return os.path.join("images", img_name) async with semaphore: try: async with session.get(img_url, headers=HEADERS, timeout=20) as response: if response.status != 200: return img_url img_data = await response.read() with open(local_path, "wb") as f: f.write(img_data) return os.path.join("images", img_name) except Exception as e: print(f"Erro ao baixar imagem {img_url}: {e}") return img_url def treat_macros(soup): """Trata macros específicas do Confluence para melhorar o Markdown.""" # Expand macro -> HTML details for expand_div in soup.find_all("div", class_="expand-container"): title_div = expand_div.find("div", class_="expand-control") content_div = expand_div.find("div", class_="expand-content") if title_div and content_div: title_text = title_div.get_text(strip=True) or "Expandir" new_tag = soup.new_tag("details") summary = soup.new_tag("summary") summary.string = title_text new_tag.append(summary) # Preservar conteúdo interno new_tag.append(content_div) expand_div.replace_with(new_tag) # Info/Warning macros macro_mapping = { "confluence-information-macro-information": "info", "confluence-information-macro-note": "note", "confluence-information-macro-warning": "warning", "confluence-information-macro-tip": "tip" } for macro in soup.find_all("div", class_="confluence-information-macro"): m_type = "info" classes = macro.get("class", []) for cls, target in macro_mapping.items(): if cls in classes: m_type = target break # Injetar uma marcação que podemos converter depois ou deixar como blockquote # O markdownify converterá
para > # Vamos tentar algo que o MkDocs Admonition reconheça se possível, # ou apenas deixar mais legível. title_span = macro.find("span", class_="confluence-information-macro-title") title = f"**{title_span.get_text(strip=True)}**\n\n" if title_span else "" blockquote = soup.new_tag("blockquote") content_body = macro.find("div", class_="confluence-information-macro-body") if content_body: # Prefixar com o tipo para facilitar pós-processamento de Admonition content_body.insert(0, BeautifulSoup(f"!!! {m_type}
", "html.parser")) blockquote.append(content_body) macro.replace_with(blockquote) return soup def sanitize_code_blocks(markdown_content): """Tenta inferir a linguagem de blocos de código sem linguagem definida.""" def replace_code(match): lang = match.group(1).strip() code = match.group(2) if not lang or lang == "java": # Confluence às vezes erra java vs js if any(x in code for x in ["DatasetBuilder", "createDataset", "displayFields", "getSelectedZoomItem"]): lang = "javascript" elif any(x in code for x in ["PreparedStatement", "ResultSet", "DriverManager"]): lang = "java" elif "SELECT" in code.upper() and "FROM" in code.upper(): lang = "sql" return f"```{lang}\n{code}\n```" pattern = re.compile(r"```(.*?)\n(.*?)\n```", re.DOTALL) return pattern.sub(replace_code, markdown_content) async def process_links_and_images(session, html, current_file_dir, url_map): """Processa HTML para baixar imagens e preparar links locais.""" soup = BeautifulSoup(html, "html.parser") soup = treat_macros(soup) tasks = [] imgs_to_process = [] for img in soup.find_all("img"): src = img.get("src") if src: tasks.append(download_image(session, src)) imgs_to_process.append(img) if tasks: local_paths = await asyncio.gather(*tasks) for img, local_path in zip(imgs_to_process, local_paths): if local_path.startswith("http"): continue rel_path = os.path.relpath(os.path.join(OUTPUT_DIR, local_path), current_file_dir) img["src"] = rel_path return str(soup) def save_markdown(path, title, html_content, source_url): os.makedirs(os.path.dirname(path), exist_ok=True) # Converter HTML para Markdown markdown_content = md(html_content, heading_style="ATX", bullets="-") # Converter as marcações de Admonition que injetamos markdown_content = markdown_content.replace("> !!!", "!!!") # Sanitização de blocos de código markdown_content = sanitize_code_blocks(markdown_content) header = f"---\ntitle: {title}\nsource: {source_url}\npath: {path.replace(OUTPUT_DIR, '')}\n---\n\n" with open(path, "w", encoding="utf-8") as f: f.write(header + markdown_content) async def build_tree_map(session, page_id, current_path, url_map): """Primeiro passo: mapear todas as URLs para caminhos locais.""" title, _, web_ui, tiny_ui, _ = await get_page_content(session, page_id) if not title: return safe_title = clean_filename(title) file_path = os.path.join(current_path, f"{safe_title}.md") urls_to_map = [web_ui, urljoin(BASE_URL, web_ui), tiny_ui, urljoin(BASE_URL, tiny_ui)] if web_ui: public_ui = web_ui.replace("/display/fluig/", "/display/public/fluig/") urls_to_map.extend([public_ui, urljoin(BASE_URL, public_ui)]) id_ui = f"/pages/viewpage.action?pageId={page_id}" urls_to_map.extend([id_ui, urljoin(BASE_URL, id_ui)]) for u in filter(None, urls_to_map): url_map[u] = file_path print(f"Mapeando: {title}") children = await get_page_children(session, page_id) new_path = os.path.join(current_path, safe_title) tasks = [build_tree_map(session, child['id'], new_path, url_map) for child in children] await asyncio.gather(*tasks) async def extract_content(session, page_id, current_path, progress_data, url_map): """Segundo passo: baixar conteúdo e imagens.""" title, html, web_ui, tiny_ui, last_updated = await get_page_content(session, page_id) if not title: return safe_title = clean_filename(title) file_dir = current_path file_path = os.path.join(file_dir, f"{safe_title}.md") # Extração Incremental if page_id in progress_data and progress_data[page_id] == last_updated and os.path.exists(file_path): print(f"Pulando (Inalterado): {title}") else: print(f"Extraindo: {title}") processed_html = await process_links_and_images(session, html, file_dir, url_map) source_url = urljoin(BASE_URL, web_ui) if web_ui else "" save_markdown(file_path, title, processed_html, source_url) progress_data[page_id] = last_updated with open(PROGRESS_FILE, "w") as f: json.dump(progress_data, f) children = await get_page_children(session, page_id) new_path = os.path.join(current_path, safe_title) tasks = [extract_content(session, child['id'], new_path, progress_data, url_map) for child in children] await asyncio.gather(*tasks) def post_process_links(url_map): print("\nIniciando pós-processamento de links locais...") sorted_urls = sorted(url_map.keys(), key=len, reverse=True) all_files = glob.glob(os.path.join(OUTPUT_DIR, "**", "*.md"), recursive=True) # Regex para links Markdown que suporta parênteses aninhados link_pattern = re.compile(r"\[(.*?)\]\(((?:[^()]+|\([^()]*\))*)\)") for local_file_path in all_files: if not os.path.isfile(local_file_path): continue with open(local_file_path, "r", encoding="utf-8") as f: content = f.read() original_content = content current_dir = os.path.dirname(local_file_path) def replace_url_in_link(match): text = match.group(1) url = match.group(2) for target_url in sorted_urls: if target_url in url: target_local_path = url_map[target_url] if os.path.abspath(target_local_path) == os.path.abspath(local_file_path): continue rel_link = os.path.relpath(target_local_path, current_dir).replace("\\", "/") # Se o link for uma sub-página mas o pai foi substituído por .md # Corrigimos para manter a estrutura de diretório if url.startswith(target_url + "/"): parent_dir = rel_link.replace(".md", "") new_url = url.replace(target_url, parent_dir) return f"[{text}]({new_url})" return f"[{text}]({rel_link})" return match.group(0) content = link_pattern.sub(replace_url_in_link, content) if content != original_content: with open(local_file_path, "w", encoding="utf-8") as f: f.write(content) async def main(): if not os.path.exists(IMAGES_DIR): os.makedirs(IMAGES_DIR) async with aiohttp.ClientSession() as session: url_map = {} if os.path.exists(URL_MAP_FILE): with open(URL_MAP_FILE, "r") as f: url_map = json.load(f) else: print("Fase 1: Mapeando árvore de URLs...") await build_tree_map(session, ROOT_PAGE_ID, OUTPUT_DIR, url_map) with open(URL_MAP_FILE, "w") as f: json.dump(url_map, f) progress_data = {} if os.path.exists(PROGRESS_FILE): with open(PROGRESS_FILE, "r") as f: progress_data = json.load(f) print("\nFase 2: Extraindo conteúdo e imagens...") await extract_content(session, ROOT_PAGE_ID, OUTPUT_DIR, progress_data, url_map) post_process_links(url_map) print("\nWiki local 100% Offline concluída!") if __name__ == "__main__": asyncio.run(main())